From 530b3b6484efdc31b2884650aabbbbd1b8621eae Mon Sep 17 00:00:00 2001 From: "Samuel Filho (WIPRO LIMITED)" Date: Mon, 5 Jan 2026 19:59:15 -0300 Subject: [PATCH 1/3] Add Elasticsearch Rally workload artifacts --- VERSION | 2 +- .../ElasticsearchRallyClientExecutorTests.cs | 257 ++++++++++ .../ElasticsearchRallyServerExecutorTests.cs | 163 +++++++ .../ElasticsearchRallyExample.txt | 458 ++++++++++++++++++ .../ElasticsearchMetricReader.cs | 90 ++++ .../ElasticsearchRallyBaseExecutor.cs | 278 +++++++++++ .../ElasticsearchRallyClientExecutor.cs | 366 ++++++++++++++ .../ElasticsearchRallyServerExecutor.cs | 185 +++++++ .../ElasticsearchRallyState.cs | 44 ++ .../ElasticsearchRally/elasticsearch.ini | 122 +++++ .../ElasticsearchRally/limits.ini | 76 +++ .../VirtualClient.Actions.csproj | 1 + .../profiles/PERF-ELASTICSEARCH-RALLY.json | 103 ++++ 13 files changed, 2144 insertions(+), 1 deletion(-) create mode 100644 src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ElasticsearchRallyClientExecutorTests.cs create mode 100644 src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ElasticsearchRallyServerExecutorTests.cs create mode 100644 src/VirtualClient/VirtualClient.Actions.UnitTests/Examples/ElasticsearchRally/ElasticsearchRallyExample.txt create mode 100644 src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchMetricReader.cs create mode 100644 src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyBaseExecutor.cs create mode 100644 src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyClientExecutor.cs create mode 100644 src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyServerExecutor.cs create mode 100644 src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyState.cs create mode 100644 src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/elasticsearch.ini create mode 100644 src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/limits.ini create mode 100644 src/VirtualClient/VirtualClient.Main/profiles/PERF-ELASTICSEARCH-RALLY.json diff --git a/VERSION b/VERSION index 6754f7424d..98d723348b 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.1.40 \ No newline at end of file +2.1.50 \ No newline at end of file diff --git a/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ElasticsearchRallyClientExecutorTests.cs b/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ElasticsearchRallyClientExecutorTests.cs new file mode 100644 index 0000000000..32c1c51ae0 --- /dev/null +++ b/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ElasticsearchRallyClientExecutorTests.cs @@ -0,0 +1,257 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace VirtualClient.Actions +{ + using System; + using System.Collections.Generic; + using System.IO; + using System.Net.Http; + using System.Reflection; + using System.Threading; + using System.Threading.Tasks; + using CRC.VirtualClient.Actions; + using Microsoft.Extensions.DependencyInjection; + using Moq; + using Newtonsoft.Json.Linq; + using NUnit.Framework; + using Polly; + using VirtualClient.Common.Telemetry; + using VirtualClient.Contracts; + + [TestFixture] + [Category("Unit")] + public class ElasticsearchRallyClientExecutorTests : MockFixture + { + private IEnumerable disks; + [SetUp] + public void SetupTest() + { + this.Setup(PlatformID.Unix); + + this.File.Reset(); + this.File.Setup(f => f.Exists(It.IsAny())) + .Returns(true); + this.Directory.Setup(f => f.Exists(It.IsAny())) + .Returns(true); + this.FileSystem.SetupGet(fs => fs.File).Returns(this.File.Object); + + string agentId = $"{Environment.MachineName}"; + this.SystemManagement.SetupGet(obj => obj.AgentId).Returns(agentId); + + this.disks = this.CreateDisks(PlatformID.Unix, true); + + this.DiskManager.Setup(mgr => mgr.GetDisksAsync(It.IsAny())).ReturnsAsync(() => this.disks); + + this.ApiClient.Setup(client => client.GetHeartbeatAsync(It.IsAny(), It.IsAny>())) + .ReturnsAsync(this.CreateHttpResponse(System.Net.HttpStatusCode.OK)); + + this.ApiClient.Setup(client => client.GetServerOnlineStatusAsync(It.IsAny(), It.IsAny>())) + .ReturnsAsync(this.CreateHttpResponse(System.Net.HttpStatusCode.OK)); + + this.Parameters = new Dictionary() + { + { nameof(ElasticsearchRallyClientExecutor.DiskFilter), "osdisk:false&biggestsize" }, + { nameof(ElasticsearchRallyClientExecutor.DistributionVersion), "8.0.0" }, + { nameof(ElasticsearchRallyClientExecutor.Port), "9200" }, + { nameof(ElasticsearchRallyClientExecutor.RallyTestMode), true }, + { nameof(ElasticsearchRallyClientExecutor.Scenario), "ExecuteGeoNamesBenchmark" }, + { nameof(ElasticsearchRallyClientExecutor.TrackName), "geonames" }, + }; + } + + [Test] + [TestCase(false, false)] + [TestCase(false, true)] + [TestCase(true, true)] + public void TestElasticsearchRallyClientExecutorWhenReportNotGenerated(bool rallyConfigured, bool serverAvailable) + { + SetupTest(); + + this.StateManager.OnGetState().ReturnsAsync(JObject.FromObject(new ElasticsearchRallyState() + { + RallyConfigured = rallyConfigured, + })); + + using (TestElasticsearchRallyClientExecutor executor = new TestElasticsearchRallyClientExecutor(this.Dependencies, this.Parameters)) + { + executor.ServerAvailable = serverAvailable; + Assert.ThrowsAsync(() => executor.ExecuteAsync(EventContext.None, CancellationToken.None)); + } + } + + [Test] + public void TestElasticsearchRallyClientExecutorWhenRallyConfiguredAndReportFailure() + { + SetupTest(); + + this.StateManager.OnGetState().ReturnsAsync(JObject.FromObject(new ElasticsearchRallyState() + { + RallyConfigured = true, + })); + + bool commandExecuted = false; + + using (TestElasticsearchRallyClientExecutor executor = new TestElasticsearchRallyClientExecutor(this.Dependencies, this.Parameters)) + { + executor.OnRunCommand = (command, arguments) => + { + if (command == "/usr/bin/sudo" && arguments.Contains("python3 -m pipx run esrally race")) + { + commandExecuted = true; + } + }; + + executor.ReportCsvExists = true; + + Assert.ThrowsAsync(() => executor.ExecuteAsync(EventContext.None, CancellationToken.None)); + } + + Assert.IsTrue(commandExecuted); + } + + [Test] + public void TestElasticsearchRallyClientExecutorWhenRallyConfiguredAndReportWithInsufficientData() + { + SetupTest(); + + bool logMessageCaptured = false; + + // Use VirtualClient's LogMessage extension method + this.Logger.OnLog = (level, eventId, state, exception) => + { + if (eventId.Name.Contains("RallyReportCsvInsufficientData")) + { + logMessageCaptured = true; + } + }; + + this.StateManager.OnGetState().ReturnsAsync(JObject.FromObject(new ElasticsearchRallyState() + { + RallyConfigured = true, + })); + + bool commandExecuted = false; + + using (TestElasticsearchRallyClientExecutor executor = new TestElasticsearchRallyClientExecutor(this.Dependencies, this.Parameters)) + { + executor.OnRunCommand = (command, arguments) => + { + if (command == "/usr/bin/sudo" && arguments.Contains("python3 -m pipx run esrally race")) + { + commandExecuted = true; + } + }; + + executor.ReportCsvExists = true; + executor.ReportLines = new string[] + { + "Metric,Task,Value,Unit" + }; + + executor.ExecuteAsync(EventContext.None, CancellationToken.None); + } + + Assert.IsTrue(logMessageCaptured); + Assert.IsTrue(commandExecuted); + } + + [Test] + public void TestElasticsearchRallyClientExecutorWhenRallyMetricReaderFailure() + { + SetupTest(); + + this.Parameters.Remove(nameof(ElasticsearchRallyClientExecutor.Scenario)); + + this.StateManager.OnGetState().ReturnsAsync(JObject.FromObject(new ElasticsearchRallyState() + { + RallyConfigured = true, + })); + + using (TestElasticsearchRallyClientExecutor executor = new TestElasticsearchRallyClientExecutor(this.Dependencies, this.Parameters)) + { + executor.ReportCsvExists = true; + string currentDirectory = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); + executor.ReportLines = System.IO.File.ReadAllLines(Path.Combine(currentDirectory, "Examples", "ElasticsearchRally", "ElasticsearchRallyExample.txt")); + + Assert.ThrowsAsync(() => executor.ExecuteAsync(EventContext.None, CancellationToken.None)); + + } + } + [Test] + public async Task TestElasticsearchRallyClientExecutorWhenRallyConfiguredAndReportGenerated() + { + SetupTest(); + + this.StateManager.OnGetState().ReturnsAsync(JObject.FromObject(new ElasticsearchRallyState() + { + RallyConfigured = true, + })); + + bool commandExecuted = false; + + using (TestElasticsearchRallyClientExecutor executor = new TestElasticsearchRallyClientExecutor(this.Dependencies, this.Parameters)) + { + executor.OnRunCommand = (command, arguments) => + { + if (command == "/usr/bin/sudo" && arguments.Contains("python3 -m pipx run esrally race")) + { + commandExecuted = true; + } + }; + + executor.ReportCsvExists = true; + string currentDirectory = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); + executor.ReportLines = System.IO.File.ReadAllLines(Path.Combine(currentDirectory, "Examples", "ElasticsearchRally", "ElasticsearchRallyExample.txt")); + + await executor.ExecuteAsync(EventContext.None, CancellationToken.None); + } + + Assert.IsTrue(commandExecuted); + } + + private class TestElasticsearchRallyClientExecutor : ElasticsearchRallyClientExecutor + { + public Action OnRunCommand { get; set; } + public bool ServerAvailable { get; set; } + public bool ReportCsvExists { get; set; } + public string[] ReportLines { get; set; } + + public TestElasticsearchRallyClientExecutor(IServiceCollection dependencies, IDictionary parameters) + : base(dependencies, parameters) + { + } + + public new Task ExecuteAsync(EventContext context, CancellationToken cancellationToken) + { + return base.ExecuteAsync(context, cancellationToken); + } + + protected override bool RunCommand(string command, string arguments, out string output, out string error) + { + output = string.Empty; + error = string.Empty; + + OnRunCommand?.Invoke(command, arguments); + + return true; + } + + protected override bool CheckServerAvailable(EventContext telemetryContext, string targetHost, int port, int timeout) + { + return this.ServerAvailable; + } + + protected override bool CheckFileExists(string path) + { + return this.ReportCsvExists; + } + + override protected string[] ReadReportLines(string reportPath) + { + return this.ReportLines; + } + } + + } +} diff --git a/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ElasticsearchRallyServerExecutorTests.cs b/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ElasticsearchRallyServerExecutorTests.cs new file mode 100644 index 0000000000..0f96c8f4bd --- /dev/null +++ b/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ElasticsearchRallyServerExecutorTests.cs @@ -0,0 +1,163 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace VirtualClient.Actions +{ + using System; + using System.Collections.Generic; + using System.IO; + using System.Net.Http; + using System.Reflection; + using System.Threading; + using System.Threading.Tasks; + using CRC.VirtualClient.Actions; + using Microsoft.Extensions.DependencyInjection; + using Moq; + using Newtonsoft.Json.Linq; + using NUnit.Framework; + using Polly; + using VirtualClient.Common.Telemetry; + using VirtualClient.Contracts; + + [TestFixture] + [Category("Unit")] + public class ElasticsearchRallyServerExecutorTests : MockFixture + { + private IEnumerable disks; + [SetUp] + public void SetupTest() + { + this.Setup(PlatformID.Unix); + + this.File.Reset(); + this.File.Setup(f => f.Exists(It.IsAny())) + .Returns(true); + this.Directory.Setup(f => f.Exists(It.IsAny())) + .Returns(true); + this.FileSystem.SetupGet(fs => fs.File).Returns(this.File.Object); + + string agentId = $"{Environment.MachineName}"; + this.SystemManagement.SetupGet(obj => obj.AgentId).Returns(agentId); + + this.disks = this.CreateDisks(PlatformID.Unix, true); + + this.DiskManager.Setup(mgr => mgr.GetDisksAsync(It.IsAny())).ReturnsAsync(() => this.disks); + + this.ApiClient.Setup(client => client.GetHeartbeatAsync(It.IsAny(), It.IsAny>())) + .ReturnsAsync(this.CreateHttpResponse(System.Net.HttpStatusCode.OK)); + + this.ApiClient.Setup(client => client.GetServerOnlineStatusAsync(It.IsAny(), It.IsAny>())) + .ReturnsAsync(this.CreateHttpResponse(System.Net.HttpStatusCode.OK)); + + this.Parameters = new Dictionary() + { + { nameof(ElasticsearchRallyServerExecutor.DiskFilter), "osdisk:false&biggestsize" }, + { nameof(ElasticsearchRallyServerExecutor.Port), "9200" }, + { nameof(ElasticsearchRallyServerExecutor.PackageName), "elasticsearchrally" }, + }; + } + + [Test] + public void TestElasticsearchRallyServerExecutorInitializeYmlNotFound() + { + SetupTest(); + + bool commandExecuted = false; + + using (TestElasticsearchRallyServerExecutor executor = new TestElasticsearchRallyServerExecutor(this.Dependencies, this.Parameters)) + { + executor.OnRunCommand = (command, arguments) => + { + commandExecuted = true; + }; + + Assert.ThrowsAsync(() => executor.InitializeAsync(EventContext.None, CancellationToken.None)); + } + + Assert.IsTrue(commandExecuted); + } + + [Test] + public async Task TestElasticsearchRallyServerExecutorInitialize() + { + SetupTest(); + + bool commandExecuted = false; + + using (TestElasticsearchRallyServerExecutor executor = new TestElasticsearchRallyServerExecutor(this.Dependencies, this.Parameters)) + { + executor.OnRunCommand = (command, arguments) => + { + commandExecuted = true; + }; + + executor.FileExists = true; + await executor.InitializeAsync(EventContext.None, CancellationToken.None); + } + + Assert.IsTrue(commandExecuted); + } + + [Test] + public async Task TestElasticsearchRallyServerExecutorExpectedRun() + { + SetupTest(); + + bool commandExecuted = false; + + using (TestElasticsearchRallyServerExecutor executor = new TestElasticsearchRallyServerExecutor(this.Dependencies, this.Parameters)) + { + await executor.ExecuteAsync(EventContext.None, CancellationToken.None); + commandExecuted = true; + } + + Assert.IsTrue(commandExecuted); + } + + private class TestElasticsearchRallyServerExecutor : ElasticsearchRallyServerExecutor + { + public Action OnRunCommand { get; set; } + public bool FileExists { get; set; } + public TestElasticsearchRallyServerExecutor(IServiceCollection dependencies, IDictionary parameters) + : base(dependencies, parameters) + { + } + + public new Task InitializeAsync(EventContext context, CancellationToken cancellationToken) + { + return base.InitializeAsync(context, cancellationToken); + } + + public new Task ExecuteAsync(EventContext context, CancellationToken cancellationToken) + { + return base.ExecuteAsync(context, cancellationToken); + } + + protected override bool RunCommand(string command, string arguments, out string output, out string error) + { + output = string.Empty; + error = string.Empty; + + OnRunCommand?.Invoke(command, arguments); + + return true; + } + + protected override bool CheckFileExists(string path) + { + return this.FileExists; + } + + protected override void WriteAllText(string path, string content) + { + + } + + protected override string ReadAllText(string path) + { + return "sample text"; + } + } + + } +} diff --git a/src/VirtualClient/VirtualClient.Actions.UnitTests/Examples/ElasticsearchRally/ElasticsearchRallyExample.txt b/src/VirtualClient/VirtualClient.Actions.UnitTests/Examples/ElasticsearchRally/ElasticsearchRallyExample.txt new file mode 100644 index 0000000000..e141d35f5d --- /dev/null +++ b/src/VirtualClient/VirtualClient.Actions.UnitTests/Examples/ElasticsearchRally/ElasticsearchRallyExample.txt @@ -0,0 +1,458 @@ +Metric,Task,Value,Unit +Cumulative indexing time of primary shards,,0.011383333333333334,min +Min cumulative indexing time across primary shards,,0.0015833333333333333,min +Median cumulative indexing time across primary shards,,0.00175,min +Max cumulative indexing time across primary shards,,0.0031833333333333336,min +Cumulative indexing throttle time of primary shards,,0,min +Min cumulative indexing throttle time across primary shards,,0,min +Median cumulative indexing throttle time across primary shards,,0,min +Max cumulative indexing throttle time across primary shards,,0,min +Cumulative merge time of primary shards,,0,min +Cumulative merge count of primary shards,,0, +Min cumulative merge time across primary shards,,0,min +Median cumulative merge time across primary shards,,0,min +Max cumulative merge time across primary shards,,0,min +Cumulative merge throttle time of primary shards,,0,min +Min cumulative merge throttle time across primary shards,,0,min +Median cumulative merge throttle time across primary shards,,0,min +Max cumulative merge throttle time across primary shards,,0,min +Cumulative refresh time of primary shards,,0.004366666666666667,min +Cumulative refresh count of primary shards,,35, +Min cumulative refresh time across primary shards,,0.0005333333333333334,min +Median cumulative refresh time across primary shards,,0.0006166666666666666,min +Max cumulative refresh time across primary shards,,0.0018833333333333334,min +Cumulative flush time of primary shards,,0,min +Cumulative flush count of primary shards,,0, +Min cumulative flush time across primary shards,,0,min +Median cumulative flush time across primary shards,,0,min +Max cumulative flush time across primary shards,,0,min +Total Young Gen GC time,,0,s +Total Young Gen GC count,,0, +Total Old Gen GC time,,0,s +Total Old Gen GC count,,0, +Dataset size,,0.00031846389174461365,GB +Store size,,0.00031846389174461365,GB +Translog size,,2.561137080192566e-07,GB +Heap used for segments,,0,MB +Heap used for doc values,,0,MB +Heap used for terms,,0,MB +Heap used for norms,,0,MB +Heap used for points,,0,MB +Heap used for stored fields,,0,MB +Segment count,,5, +Total Ingest Pipeline count,,0, +Total Ingest Pipeline time,,0,s +Total Ingest Pipeline failed,,0, +Min Throughput,index-append,3136.69,docs/s +Mean Throughput,index-append,3136.69,docs/s +Median Throughput,index-append,3136.69,docs/s +Max Throughput,index-append,3136.69,docs/s +50th percentile latency,index-append,411.4830879999545,ms +100th percentile latency,index-append,493.50839100009125,ms +50th percentile service time,index-append,411.4830879999545,ms +100th percentile service time,index-append,493.50839100009125,ms +error rate,index-append,0.00,% +Min Throughput,index-stats,106.55,ops/s +Mean Throughput,index-stats,106.55,ops/s +Median Throughput,index-stats,106.55,ops/s +Max Throughput,index-stats,106.55,ops/s +100th percentile latency,index-stats,17.315318999976625,ms +100th percentile service time,index-stats,7.617430999971475,ms +error rate,index-stats,0.00,% +Min Throughput,node-stats,35.37,ops/s +Mean Throughput,node-stats,35.37,ops/s +Median Throughput,node-stats,35.37,ops/s +Max Throughput,node-stats,35.37,ops/s +100th percentile latency,node-stats,41.9795630000408,ms +100th percentile service time,node-stats,13.148037000064505,ms +error rate,node-stats,0.00,% +Min Throughput,default,10.86,ops/s +Mean Throughput,default,10.86,ops/s +Median Throughput,default,10.86,ops/s +Max Throughput,default,10.86,ops/s +100th percentile latency,default,105.78448999990542,ms +100th percentile service time,default,13.30666100000144,ms +error rate,default,0.00,% +Min Throughput,term,79.80,ops/s +Mean Throughput,term,79.80,ops/s +Median Throughput,term,79.80,ops/s +Max Throughput,term,79.80,ops/s +100th percentile latency,term,23.220744000013838,ms +100th percentile service time,term,10.408558999984052,ms +error rate,term,0.00,% +Min Throughput,phrase,57.76,ops/s +Mean Throughput,phrase,57.76,ops/s +Median Throughput,phrase,57.76,ops/s +Max Throughput,phrase,57.76,ops/s +100th percentile latency,phrase,24.33359899998777,ms +100th percentile service time,phrase,6.615323000005446,ms +error rate,phrase,0.00,% +Min Throughput,country_agg_uncached,16.73,ops/s +Mean Throughput,country_agg_uncached,16.73,ops/s +Median Throughput,country_agg_uncached,16.73,ops/s +Max Throughput,country_agg_uncached,16.73,ops/s +100th percentile latency,country_agg_uncached,72.52013200002239,ms +100th percentile service time,country_agg_uncached,12.335570000004736,ms +error rate,country_agg_uncached,0.00,% +Min Throughput,country_agg_cached,64.70,ops/s +Mean Throughput,country_agg_cached,64.70,ops/s +Median Throughput,country_agg_cached,64.70,ops/s +Max Throughput,country_agg_cached,64.70,ops/s +100th percentile latency,country_agg_cached,39.73352600007729,ms +100th percentile service time,country_agg_cached,23.965268999972977,ms +error rate,country_agg_cached,0.00,% +Min Throughput,scroll,14.43,pages/s +Mean Throughput,scroll,14.43,pages/s +Median Throughput,scroll,14.43,pages/s +Max Throughput,scroll,14.43,pages/s +100th percentile latency,scroll,185.59198600007676,ms +100th percentile service time,scroll,46.108054000001175,ms +error rate,scroll,0.00,% +Min Throughput,expression,9.45,ops/s +Mean Throughput,expression,9.45,ops/s +Median Throughput,expression,9.45,ops/s +Max Throughput,expression,9.45,ops/s +100th percentile latency,expression,115.3483429999369,ms +100th percentile service time,expression,9.098823999920569,ms +error rate,expression,0.00,% +Min Throughput,painless_static,9.91,ops/s +Mean Throughput,painless_static,9.91,ops/s +Median Throughput,painless_static,9.91,ops/s +Max Throughput,painless_static,9.91,ops/s +100th percentile latency,painless_static,113.48900499990577,ms +100th percentile service time,painless_static,12.192466999977114,ms +error rate,painless_static,0.00,% +Min Throughput,painless_dynamic,19.61,ops/s +Mean Throughput,painless_dynamic,19.61,ops/s +Median Throughput,painless_dynamic,19.61,ops/s +Max Throughput,painless_dynamic,19.61,ops/s +100th percentile latency,painless_dynamic,64.49730999997882,ms +100th percentile service time,painless_dynamic,12.27088600001025,ms +error rate,painless_dynamic,0.00,% +Min Throughput,decay_geo_gauss_function_score,41.41,ops/s +Mean Throughput,decay_geo_gauss_function_score,41.41,ops/s +Median Throughput,decay_geo_gauss_function_score,41.41,ops/s +Max Throughput,decay_geo_gauss_function_score,41.41,ops/s +100th percentile latency,decay_geo_gauss_function_score,36.979236999968634,ms +100th percentile service time,decay_geo_gauss_function_score,12.484946999961721,ms +error rate,decay_geo_gauss_function_score,0.00,% +Min Throughput,decay_geo_gauss_script_score,33.34,ops/s +Mean Throughput,decay_geo_gauss_script_score,33.34,ops/s +Median Throughput,decay_geo_gauss_script_score,33.34,ops/s +Max Throughput,decay_geo_gauss_script_score,33.34,ops/s +100th percentile latency,decay_geo_gauss_script_score,42.514687999982925,ms +100th percentile service time,decay_geo_gauss_script_score,12.168380000048273,ms +error rate,decay_geo_gauss_script_score,0.00,% +Min Throughput,field_value_function_score,57.88,ops/s +Mean Throughput,field_value_function_score,57.88,ops/s +Median Throughput,field_value_function_score,57.88,ops/s +Max Throughput,field_value_function_score,57.88,ops/s +100th percentile latency,field_value_function_score,29.312013999970077,ms +100th percentile service time,field_value_function_score,11.531850999972448,ms +error rate,field_value_function_score,0.00,% +Min Throughput,field_value_script_score,38.28,ops/s +Mean Throughput,field_value_script_score,38.28,ops/s +Median Throughput,field_value_script_score,38.28,ops/s +Max Throughput,field_value_script_score,38.28,ops/s +100th percentile latency,field_value_script_score,36.996438000073795,ms +100th percentile service time,field_value_script_score,10.143301000084648,ms +error rate,field_value_script_score,0.00,% +Min Throughput,large_terms,1.70,ops/s +Mean Throughput,large_terms,1.70,ops/s +Median Throughput,large_terms,1.70,ops/s +Max Throughput,large_terms,1.70,ops/s +100th percentile latency,large_terms,839.6622349999916,ms +100th percentile service time,large_terms,244.41395700000612,ms +error rate,large_terms,0.00,% +Min Throughput,large_filtered_terms,8.92,ops/s +Mean Throughput,large_filtered_terms,8.92,ops/s +Median Throughput,large_filtered_terms,8.92,ops/s +Max Throughput,large_filtered_terms,8.92,ops/s +100th percentile latency,large_filtered_terms,217.69280099999833,ms +100th percentile service time,large_filtered_terms,98.75722699996459,ms +error rate,large_filtered_terms,0.00,% +Min Throughput,large_prohibited_terms,6.94,ops/s +Mean Throughput,large_prohibited_terms,6.94,ops/s +Median Throughput,large_prohibited_terms,6.94,ops/s +Max Throughput,large_prohibited_terms,6.94,ops/s +100th percentile latency,large_prohibited_terms,250.7793090001087,ms +100th percentile service time,large_prohibited_terms,100.76048100006574,ms +error rate,large_prohibited_terms,0.00,% +Min Throughput,desc_sort_population,30.44,ops/s +Mean Throughput,desc_sort_population,30.44,ops/s +Median Throughput,desc_sort_population,30.44,ops/s +Max Throughput,desc_sort_population,30.44,ops/s +100th percentile latency,desc_sort_population,45.147679000024254,ms +100th percentile service time,desc_sort_population,11.996705999990809,ms +error rate,desc_sort_population,0.00,% +Min Throughput,asc_sort_population,90.59,ops/s +Mean Throughput,asc_sort_population,90.59,ops/s +Median Throughput,asc_sort_population,90.59,ops/s +Max Throughput,asc_sort_population,90.59,ops/s +100th percentile latency,asc_sort_population,20.246841000016502,ms +100th percentile service time,asc_sort_population,8.775173999993058,ms +error rate,asc_sort_population,0.00,% +Min Throughput,asc_sort_with_after_population,73.41,ops/s +Mean Throughput,asc_sort_with_after_population,73.41,ops/s +Median Throughput,asc_sort_with_after_population,73.41,ops/s +Max Throughput,asc_sort_with_after_population,73.41,ops/s +100th percentile latency,asc_sort_with_after_population,19.789750999962052,ms +100th percentile service time,asc_sort_with_after_population,5.837562000010621,ms +error rate,asc_sort_with_after_population,0.00,% +Min Throughput,desc_sort_geonameid,60.90,ops/s +Mean Throughput,desc_sort_geonameid,60.90,ops/s +Median Throughput,desc_sort_geonameid,60.90,ops/s +Max Throughput,desc_sort_geonameid,60.90,ops/s +100th percentile latency,desc_sort_geonameid,26.10080299996298,ms +100th percentile service time,desc_sort_geonameid,9.38935399994989,ms +error rate,desc_sort_geonameid,0.00,% +Min Throughput,desc_sort_with_after_geonameid,76.25,ops/s +Mean Throughput,desc_sort_with_after_geonameid,76.25,ops/s +Median Throughput,desc_sort_with_after_geonameid,76.25,ops/s +Max Throughput,desc_sort_with_after_geonameid,76.25,ops/s +100th percentile latency,desc_sort_with_after_geonameid,26.196819000006144,ms +100th percentile service time,desc_sort_with_after_geonameid,12.640498999985539,ms +error rate,desc_sort_with_after_geonameid,0.00,% +Min Throughput,asc_sort_geonameid,65.13,ops/s +Mean Throughput,asc_sort_geonameid,65.13,ops/s +Median Throughput,asc_sort_geonameid,65.13,ops/s +Max Throughput,asc_sort_geonameid,65.13,ops/s +100th percentile latency,asc_sort_geonameid,23.98946799996793,ms +100th percentile service time,asc_sort_geonameid,8.35734199995386,ms +error rate,asc_sort_geonameid,0.00,% +Min Throughput,asc_sort_with_after_geonameid,110.72,ops/s +Mean Throughput,asc_sort_with_after_geonameid,110.72,ops/s +Median Throughput,asc_sort_with_after_geonameid,110.72,ops/s +Max Throughput,asc_sort_with_after_geonameid,110.72,ops/s +100th percentile latency,asc_sort_with_after_geonameid,13.891357000034077,ms +100th percentile service time,asc_sort_with_after_geonameid,4.56060500005151,ms +error rate,asc_sort_with_after_geonameid,0.00,% +Metric,Task,Value,Unit +Cumulative indexing time of primary shards,,0.0036666666666666666,min +Min cumulative indexing time across primary shards,,0.0005166666666666667,min +Median cumulative indexing time across primary shards,,0.0007833333333333334,min +Max cumulative indexing time across primary shards,,0.0008333333333333334,min +Cumulative indexing throttle time of primary shards,,0,min +Min cumulative indexing throttle time across primary shards,,0,min +Median cumulative indexing throttle time across primary shards,,0,min +Max cumulative indexing throttle time across primary shards,,0,min +Cumulative merge time of primary shards,,0,min +Cumulative merge count of primary shards,,0, +Min cumulative merge time across primary shards,,0,min +Median cumulative merge time across primary shards,,0,min +Max cumulative merge time across primary shards,,0,min +Cumulative merge throttle time of primary shards,,0,min +Min cumulative merge throttle time across primary shards,,0,min +Median cumulative merge throttle time across primary shards,,0,min +Max cumulative merge throttle time across primary shards,,0,min +Cumulative refresh time of primary shards,,0.0024333333333333334,min +Cumulative refresh count of primary shards,,30, +Min cumulative refresh time across primary shards,,0.00035,min +Median cumulative refresh time across primary shards,,0.0005166666666666667,min +Max cumulative refresh time across primary shards,,0.0006,min +Cumulative flush time of primary shards,,0,min +Cumulative flush count of primary shards,,0, +Min cumulative flush time across primary shards,,0,min +Median cumulative flush time across primary shards,,0,min +Max cumulative flush time across primary shards,,0,min +Total Young Gen GC time,,0,s +Total Young Gen GC count,,0, +Total Old Gen GC time,,0,s +Total Old Gen GC count,,0, +Dataset size,,0.0003236671909689903,GB +Store size,,0.0003236671909689903,GB +Translog size,,2.561137080192566e-07,GB +Heap used for segments,,0,MB +Heap used for doc values,,0,MB +Heap used for terms,,0,MB +Heap used for norms,,0,MB +Heap used for points,,0,MB +Heap used for stored fields,,0,MB +Segment count,,5, +Total Ingest Pipeline count,,0, +Total Ingest Pipeline time,,0,s +Total Ingest Pipeline failed,,0, +Min Throughput,index-append,9476.96,docs/s +Mean Throughput,index-append,9476.96,docs/s +Median Throughput,index-append,9476.96,docs/s +Max Throughput,index-append,9476.96,docs/s +50th percentile latency,index-append,135.9809470000073,ms +100th percentile latency,index-append,198.33244400001604,ms +50th percentile service time,index-append,135.9809470000073,ms +100th percentile service time,index-append,198.33244400001604,ms +error rate,index-append,0.00,% +Min Throughput,index-stats,120.09,ops/s +Mean Throughput,index-stats,120.09,ops/s +Median Throughput,index-stats,120.09,ops/s +Max Throughput,index-stats,120.09,ops/s +100th percentile latency,index-stats,12.128651000011814,ms +100th percentile service time,index-stats,3.522139999972751,ms +error rate,index-stats,0.00,% +Min Throughput,node-stats,61.95,ops/s +Mean Throughput,node-stats,61.95,ops/s +Median Throughput,node-stats,61.95,ops/s +Max Throughput,node-stats,61.95,ops/s +100th percentile latency,node-stats,24.29407399995398,ms +100th percentile service time,node-stats,7.514713999967171,ms +error rate,node-stats,0.00,% +Min Throughput,default,61.50,ops/s +Mean Throughput,default,61.50,ops/s +Median Throughput,default,61.50,ops/s +Max Throughput,default,61.50,ops/s +100th percentile latency,default,22.132435999992595,ms +100th percentile service time,default,5.592903999968257,ms +error rate,default,0.00,% +Min Throughput,term,139.78,ops/s +Mean Throughput,term,139.78,ops/s +Median Throughput,term,139.78,ops/s +Max Throughput,term,139.78,ops/s +100th percentile latency,term,11.871426999960022,ms +100th percentile service time,term,3.8017339999214528,ms +error rate,term,0.00,% +Min Throughput,phrase,114.15,ops/s +Mean Throughput,phrase,114.15,ops/s +Median Throughput,phrase,114.15,ops/s +Max Throughput,phrase,114.15,ops/s +100th percentile latency,phrase,14.49915399996371,ms +100th percentile service time,phrase,5.318823999914457,ms +error rate,phrase,0.00,% +Min Throughput,country_agg_uncached,69.79,ops/s +Mean Throughput,country_agg_uncached,69.79,ops/s +Median Throughput,country_agg_uncached,69.79,ops/s +Max Throughput,country_agg_uncached,69.79,ops/s +100th percentile latency,country_agg_uncached,23.401547999924333,ms +100th percentile service time,country_agg_uncached,8.792318000018895,ms +error rate,country_agg_uncached,0.00,% +Min Throughput,country_agg_cached,76.39,ops/s +Mean Throughput,country_agg_cached,76.39,ops/s +Median Throughput,country_agg_cached,76.39,ops/s +Max Throughput,country_agg_cached,76.39,ops/s +100th percentile latency,country_agg_cached,22.500943999943956,ms +100th percentile service time,country_agg_cached,9.051519999957236,ms +error rate,country_agg_cached,0.00,% +Min Throughput,scroll,47.53,pages/s +Mean Throughput,scroll,47.53,pages/s +Median Throughput,scroll,47.53,pages/s +Max Throughput,scroll,47.53,pages/s +100th percentile latency,scroll,82.65710499995294,ms +100th percentile service time,scroll,39.878026999986105,ms +error rate,scroll,0.00,% +Min Throughput,expression,89.08,ops/s +Mean Throughput,expression,89.08,ops/s +Median Throughput,expression,89.08,ops/s +Max Throughput,expression,89.08,ops/s +100th percentile latency,expression,19.51869499998793,ms +100th percentile service time,expression,7.728913000050852,ms +error rate,expression,0.00,% +Min Throughput,painless_static,76.50,ops/s +Mean Throughput,painless_static,76.50,ops/s +Median Throughput,painless_static,76.50,ops/s +Max Throughput,painless_static,76.50,ops/s +100th percentile latency,painless_static,25.17671700002211,ms +100th percentile service time,painless_static,11.82104800000161,ms +error rate,painless_static,0.00,% +Min Throughput,painless_dynamic,81.07,ops/s +Mean Throughput,painless_dynamic,81.07,ops/s +Median Throughput,painless_dynamic,81.07,ops/s +Max Throughput,painless_dynamic,81.07,ops/s +100th percentile latency,painless_dynamic,21.390404999920065,ms +100th percentile service time,painless_dynamic,8.589051999933872,ms +error rate,painless_dynamic,0.00,% +Min Throughput,decay_geo_gauss_function_score,109.06,ops/s +Mean Throughput,decay_geo_gauss_function_score,109.06,ops/s +Median Throughput,decay_geo_gauss_function_score,109.06,ops/s +Max Throughput,decay_geo_gauss_function_score,109.06,ops/s +100th percentile latency,decay_geo_gauss_function_score,15.251723999995193,ms +100th percentile service time,decay_geo_gauss_function_score,5.770663999896897,ms +error rate,decay_geo_gauss_function_score,0.00,% +Min Throughput,decay_geo_gauss_script_score,105.08,ops/s +Mean Throughput,decay_geo_gauss_script_score,105.08,ops/s +Median Throughput,decay_geo_gauss_script_score,105.08,ops/s +Max Throughput,decay_geo_gauss_script_score,105.08,ops/s +100th percentile latency,decay_geo_gauss_script_score,17.79644099997313,ms +100th percentile service time,decay_geo_gauss_script_score,7.966606999957548,ms +error rate,decay_geo_gauss_script_score,0.00,% +Min Throughput,field_value_function_score,116.19,ops/s +Mean Throughput,field_value_function_score,116.19,ops/s +Median Throughput,field_value_function_score,116.19,ops/s +Max Throughput,field_value_function_score,116.19,ops/s +100th percentile latency,field_value_function_score,14.850521000084882,ms +100th percentile service time,field_value_function_score,5.726571000082004,ms +error rate,field_value_function_score,0.00,% +Min Throughput,field_value_script_score,93.56,ops/s +Mean Throughput,field_value_script_score,93.56,ops/s +Median Throughput,field_value_script_score,93.56,ops/s +Max Throughput,field_value_script_score,93.56,ops/s +100th percentile latency,field_value_script_score,21.80660099998022,ms +100th percentile service time,field_value_script_score,10.821197000041138,ms +error rate,field_value_script_score,0.00,% +Min Throughput,large_terms,6.89,ops/s +Mean Throughput,large_terms,6.89,ops/s +Median Throughput,large_terms,6.89,ops/s +Max Throughput,large_terms,6.89,ops/s +100th percentile latency,large_terms,278.7682100000666,ms +100th percentile service time,large_terms,127.296881999996,ms +error rate,large_terms,0.00,% +Min Throughput,large_filtered_terms,12.08,ops/s +Mean Throughput,large_filtered_terms,12.08,ops/s +Median Throughput,large_filtered_terms,12.08,ops/s +Max Throughput,large_filtered_terms,12.08,ops/s +100th percentile latency,large_filtered_terms,163.45256400006747,ms +100th percentile service time,large_filtered_terms,73.9418680000199,ms +error rate,large_filtered_terms,0.00,% +Min Throughput,large_prohibited_terms,8.65,ops/s +Mean Throughput,large_prohibited_terms,8.65,ops/s +Median Throughput,large_prohibited_terms,8.65,ops/s +Max Throughput,large_prohibited_terms,8.65,ops/s +100th percentile latency,large_prohibited_terms,223.2875120000699,ms +100th percentile service time,large_prohibited_terms,101.53825800000504,ms +error rate,large_prohibited_terms,0.00,% +Min Throughput,desc_sort_population,107.80,ops/s +Mean Throughput,desc_sort_population,107.80,ops/s +Median Throughput,desc_sort_population,107.80,ops/s +Max Throughput,desc_sort_population,107.80,ops/s +100th percentile latency,desc_sort_population,15.771683000025405,ms +100th percentile service time,desc_sort_population,6.201829000019643,ms +error rate,desc_sort_population,0.00,% +Min Throughput,asc_sort_population,123.34,ops/s +Mean Throughput,asc_sort_population,123.34,ops/s +Median Throughput,asc_sort_population,123.34,ops/s +Max Throughput,asc_sort_population,123.34,ops/s +100th percentile latency,asc_sort_population,16.886569000007512,ms +100th percentile service time,asc_sort_population,8.308684999974503,ms +error rate,asc_sort_population,0.00,% +Min Throughput,asc_sort_with_after_population,101.53,ops/s +Mean Throughput,asc_sort_with_after_population,101.53,ops/s +Median Throughput,asc_sort_with_after_population,101.53,ops/s +Max Throughput,asc_sort_with_after_population,101.53,ops/s +100th percentile latency,asc_sort_with_after_population,13.655803999995442,ms +100th percentile service time,asc_sort_with_after_population,3.506921999928636,ms +error rate,asc_sort_with_after_population,0.00,% +Min Throughput,desc_sort_geonameid,66.44,ops/s +Mean Throughput,desc_sort_geonameid,66.44,ops/s +Median Throughput,desc_sort_geonameid,66.44,ops/s +Max Throughput,desc_sort_geonameid,66.44,ops/s +100th percentile latency,desc_sort_geonameid,23.736659999940457,ms +100th percentile service time,desc_sort_geonameid,8.313776000022699,ms +error rate,desc_sort_geonameid,0.00,% +Min Throughput,desc_sort_with_after_geonameid,114.98,ops/s +Mean Throughput,desc_sort_with_after_geonameid,114.98,ops/s +Median Throughput,desc_sort_with_after_geonameid,114.98,ops/s +Max Throughput,desc_sort_with_after_geonameid,114.98,ops/s +100th percentile latency,desc_sort_with_after_geonameid,15.178980999962732,ms +100th percentile service time,desc_sort_with_after_geonameid,6.037147999904846,ms +error rate,desc_sort_with_after_geonameid,0.00,% +Min Throughput,asc_sort_geonameid,117.96,ops/s +Mean Throughput,asc_sort_geonameid,117.96,ops/s +Median Throughput,asc_sort_geonameid,117.96,ops/s +Max Throughput,asc_sort_geonameid,117.96,ops/s +100th percentile latency,asc_sort_geonameid,14.10403599993515,ms +100th percentile service time,asc_sort_geonameid,5.3342209999982515,ms +error rate,asc_sort_geonameid,0.00,% +Min Throughput,asc_sort_with_after_geonameid,162.40,ops/s +Mean Throughput,asc_sort_with_after_geonameid,162.40,ops/s +Median Throughput,asc_sort_with_after_geonameid,162.40,ops/s +Max Throughput,asc_sort_with_after_geonameid,162.40,ops/s +100th percentile latency,asc_sort_with_after_geonameid,10.568196999997781,ms +100th percentile service time,asc_sort_with_after_geonameid,4.075862000036068,ms +error rate,asc_sort_with_after_geonameid,0.00,% \ No newline at end of file diff --git a/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchMetricReader.cs b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchMetricReader.cs new file mode 100644 index 0000000000..21350a7c88 --- /dev/null +++ b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchMetricReader.cs @@ -0,0 +1,90 @@ +namespace CRC.VirtualClient.Actions +{ + using System; + using System.Collections.Generic; + using global::VirtualClient; + using global::VirtualClient.Contracts; + using Microsoft.VisualBasic; + + internal class ElasticsearchMetricReader + { + /// + /// Reads Elasticsearch Rally metrics from the provided report contents. + /// + /// + /// + /// + public static IList Read( + string[] reportContents, + Dictionary metadata) + { + IList metrics = new List(); + + foreach (string line in reportContents) + { + // Metric,Task,Value,Unit + string[] cols = line.Split(','); + + if (cols.Length != 4 || !double.TryParse(cols[2], out double value)) + { + continue; + } + + string metricName = cols[0].ToLower(); + string taskName = cols[1].ToLower(); + string unit = cols[3]; + + int verbosity = 1; // 0: Critical, 1: Standard, 2: Informational. + MetricRelativity relativity = MetricRelativity.Undefined; + + if ( + metricName.StartsWith(MetricNames.Mean) || + metricName.StartsWith(MetricNames.P100)) + { + verbosity = 0; + } + + if (metricName.EndsWith(MetricNames.Throughput)) + { + relativity = MetricRelativity.HigherIsBetter; + } + else if ( + metricName.EndsWith(MetricNames.Latency) || + metricName.EndsWith(MetricNames.ServiceTime) || + metricName.EndsWith(MetricNames.ProcessingTime)) + { + relativity = MetricRelativity.LowerIsBetter; + } + + if (taskName.Length > 0) + { + metricName = $"{taskName}_{metricName}"; + } + + metricName = metricName.Replace(' ', '_').Replace('-', '_'); + + Metric metric = new Metric( + name: metricName, + value: value, + unit: unit, + relativity: relativity, + verbosity: verbosity, + metadata: metadata); + + metrics.Add(metric); + } + + return metrics; + } + + private struct MetricNames + { + public const string Latency = "latency"; + public const string Mean = "mean"; + public const string ServiceTime = "service time"; + public const string P100 = "100th"; + public const string ProcessingTime = "processing time"; + public const string Throughput = "throughput"; + } + } +} diff --git a/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyBaseExecutor.cs b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyBaseExecutor.cs new file mode 100644 index 0000000000..c5f6bd1b88 --- /dev/null +++ b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyBaseExecutor.cs @@ -0,0 +1,278 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace CRC.VirtualClient.Actions +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Net; + using System.Threading; + using System.Threading.Tasks; + using global::VirtualClient; + using global::VirtualClient.Common.Extensions; + using global::VirtualClient.Common.Telemetry; + using global::VirtualClient.Contracts; + using Microsoft.Extensions.DependencyInjection; + + /// + /// Base class for all Elasticsearch Rally workload executors. + /// + public abstract class ElasticsearchRallyBaseExecutor : VirtualClientComponent + { + /// + /// Constructor for + /// + /// Provides required dependencies to the component. + /// Parameters defined in the profile or supplied on the command line. + public ElasticsearchRallyBaseExecutor(IServiceCollection dependencies, IDictionary parameters) + : base(dependencies, parameters) + { + this.SupportedRoles = new List + { + ClientRole.Client, + ClientRole.Server + }; + } + + /// + /// Disk filter specified + /// + public string DiskFilter + { + get + { + return this.Parameters.GetValue(nameof(this.DiskFilter), "osdisk:false&biggestsize"); + } + } + + /// + /// Elasticsearch Node Port Number + /// + public int Port + { + get + { + return this.Parameters.GetValue(nameof(this.Port), 9200); + } + } + + /// + /// Manages the state of the system. + /// + protected IStateManager StateManager => this.Dependencies.GetService(); + + /// + /// Initializes the environment for execution of the Rally workload. + /// + protected override async Task InitializeAsync(EventContext telemetryContext, CancellationToken cancellationToken) + { + if (!this.IsMultiRoleLayout()) + { + throw new WorkloadException( + $"{this.PackageName} Client/Server requires at least 2 nodes to run", + ErrorReason.LayoutInvalid); + } + + if (!cancellationToken.IsCancellationRequested) + { + IApiClientManager clientManager = this.Dependencies.GetService(); + + ClientInstance instance = this.GetLayoutClientInstance(); + string layoutIPAddress = instance.IPAddress; + + this.ThrowIfLayoutClientIPAddressNotFound(layoutIPAddress); + this.ThrowIfRoleNotSupported(instance.Role); + + ClientInstance clientInstance = this.GetLayoutClientInstances(ClientRole.Client).First(); + + IPAddress.TryParse(clientInstance.IPAddress, out IPAddress clientIpAddress); + telemetryContext.AddContext("ClientIpAddress", clientIpAddress.ToString()); + + IEnumerable serverInstances = this.GetLayoutClientInstances(ClientRole.Server); + + foreach (ClientInstance serverInstance in serverInstances) + { + IPAddress.TryParse(serverInstance.IPAddress, out IPAddress serverIPAddress); + + IApiClient apiClient = clientManager.GetOrCreateApiClient(serverIPAddress.ToString(), serverIPAddress); + this.RegisterToSendExitNotifications($"{this.TypeName}.ExitNotification", apiClient); + } + } + + await Task.CompletedTask; + + return; + } + + /// + /// Get filtered data directory + /// + /// + /// + /// + protected async Task GetDataDirectoryAsync(CancellationToken cancellationToken) + { + string diskPath = string.Empty; + + if (!cancellationToken.IsCancellationRequested) + { + ISystemManagement systemManager = this.Dependencies.GetService(); + + IEnumerable disks = await systemManager.DiskManager.GetDisksAsync(cancellationToken); + + IEnumerable disksToTest = DiskFilters.FilterDisks(disks, this.DiskFilter, this.Platform).ToList(); + + if (disksToTest?.Any() != true) + { + throw new WorkloadException( + "Expected disks to test not found. Given the parameters defined for the profile action/step or those passed " + + "in on the command line, the requisite disks do not exist on the system or could not be identified based on the properties " + + "of the existing disks.", + ErrorReason.DependencyNotFound); + } + + diskPath = $"{disksToTest.First().GetPreferredAccessPath(this.Platform)}"; + } + + return diskPath; + } + + /// + /// Determines whether a file exists at the specified path. + /// + /// The method does not check whether the caller has permission to access the file. + /// Passing an invalid path may result in false being returned. + /// The path to the file to check. This can be either a relative or an absolute path. + /// true if a file exists at the specified path; otherwise, false. + protected virtual bool CheckFileExists(string path) + { + return System.IO.File.Exists(path); + } + + /// + /// Runs a bash script command. + /// + /// + /// Task identifier + /// + /// + /// + protected bool RunCommandScript(EventContext telemetryContext, string key, string script, bool throwOnError = false) + { + bool ok = this.RunCommand("/bin/bash", BuildBashScript(script), out string output, out string error); + + this.HandleTelemetry(telemetryContext, key, script, throwOnError, ok, output, error); + + return ok; + } + + /// + /// Runs a command as root. + /// + /// + /// Task identifier + /// + /// + /// + protected bool RunCommandAsRoot(EventContext telemetryContext, string key, string command, bool throwOnError = false) + { + return this.RunCommandAsUser(telemetryContext, null, key, command, throwOnError); + } + + /// + /// Runs a command as a specific user. + /// + /// + /// + /// + /// + /// + protected bool RunCommandAsUser(string user, string command, out string output, out string error) + { + return + this.RunCommand( + "/usr/bin/sudo", + string.IsNullOrEmpty(user) ? command : $"-u {user} -H bash {BuildBashScript(command)}", + out output, + out error); + } + + /// + /// Runs a command as a specific user. + /// + /// + /// + /// Task identifier + /// + /// + /// + /// + protected bool RunCommandAsUser(EventContext telemetryContext, string user, string key, string command, bool throwOnError = false) + { + bool ok = this.RunCommandAsUser(user, command, out string output, out string error); + + this.HandleTelemetry(telemetryContext, key, command, throwOnError, ok, output, error); + + return ok; + } + + /// + /// Runs a command. + /// + /// + /// + /// + /// + /// + protected virtual bool RunCommand(string command, string arguments, out string output, out string error) + { + output = null; + error = null; + + var psi = new ProcessStartInfo + { + FileName = command, + Arguments = arguments, + RedirectStandardOutput = true, + RedirectStandardError = true, + UseShellExecute = false + }; + + using (var p = Process.Start(psi)) + { + p.WaitForExit(); + output = p.StandardOutput.ReadToEnd().Trim(); + error = p.StandardError.ReadToEnd().Trim(); + return p.ExitCode == 0; + } + } + + private static string BuildBashScript(string script) + { + return string.Concat("-lc \"", script.Replace("\"", "\\\""), "\""); + } + + private void HandleTelemetry(EventContext telemetryContext, string key, string command, bool throwOnError, bool ok, string output, string error) + { + telemetryContext.AddContext($"{this.TypeName}.{key}Command", command); + telemetryContext.AddContext($"{this.TypeName}.{key}Output", output); + telemetryContext.AddContext($"{this.TypeName}.{key}Error", error); + telemetryContext.AddContext($"{this.TypeName}.{key}Ok", ok); + this.Logger.LogMessage($"{this.TypeName}.{key}", telemetryContext); + + if (!ok) + { + this.Logger.LogMessage($"{this.TypeName}.{key}Failed", telemetryContext); + + if (throwOnError) + { + throw new WorkloadException( + $"Rally server configuration failed. Output: {output}; Error: {error}", + ErrorReason.WorkloadUnexpectedAnomaly); + } + } + } + } +} diff --git a/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyClientExecutor.cs b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyClientExecutor.cs new file mode 100644 index 0000000000..53e514ead2 --- /dev/null +++ b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyClientExecutor.cs @@ -0,0 +1,366 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace CRC.VirtualClient.Actions +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Net; + using System.Threading; + using System.Threading.Tasks; + using global::VirtualClient; + using global::VirtualClient.Common.Extensions; + using global::VirtualClient.Common.Telemetry; + using global::VirtualClient.Contracts; + using global::VirtualClient.Contracts.Metadata; + using Microsoft.Extensions.DependencyInjection; + + /// + /// The Elasticsearch Rally Client workload executor. + /// + public class ElasticsearchRallyClientExecutor : ElasticsearchRallyBaseExecutor + { + /// + /// Initializes a new instance of the class. + /// + /// An enumeration of dependencies that can be used for dependency injection. + /// An enumeration of key-value pairs that can control the execution of the component. + public ElasticsearchRallyClientExecutor(IServiceCollection dependencies, IDictionary parameters = null) + : base(dependencies, parameters) + { + } + + /// + /// The Elasticsearch Distribution Version. + /// + public string DistributionVersion + { + get + { + return this.Parameters.GetValue(nameof(ElasticsearchRallyClientExecutor.DistributionVersion), "8.0.0"); + } + } + + /// + /// The track targeted for run by Rally. + /// + public string TrackName + { + get + { + return this.Parameters.GetValue(nameof(ElasticsearchRallyClientExecutor.TrackName)); + } + } + + /// + /// TestMode indicates whether to run Rally in test mode. + /// + public bool RallyTestMode + { + get + { + return this.Parameters.GetValue(nameof(ElasticsearchRallyClientExecutor.RallyTestMode)); + } + } + + /// + /// Executes the workload. + /// + /// Provides context information that will be captured with telemetry events. + /// A token that can be used to cancel the operation. + protected override async Task ExecuteAsync(EventContext telemetryContext, CancellationToken cancellationToken) + { + await this.Logger.LogMessageAsync($"{this.TypeName}.ExecuteClient", telemetryContext.Clone(), async () => + { + ElasticsearchRallyState state = await this.StateManager.GetStateAsync(nameof(ElasticsearchRallyState), cancellationToken) + ?? new ElasticsearchRallyState(); + + if (cancellationToken.IsCancellationRequested) + { + return; + } + + ClientInstance clientInstance = this.GetLayoutClientInstances(ClientRole.Server).FirstOrDefault(); + IPAddress.TryParse(clientInstance.IPAddress, out IPAddress serverIPAddress); + string targetHost = clientInstance?.IPAddress; + if (string.IsNullOrEmpty(targetHost)) + { + throw new WorkloadException( + $"Elasticsearch Rally Client could not determine the target host from the layout.", + ErrorReason.LayoutInvalid); + } + + string user = this.PlatformSpecifics.GetLoggedInUser(); + int port = this.Port; + string trackName = this.TrackName; + string dataDirectory = await this.GetDataDirectoryAsync(cancellationToken); + + string rallySharedStoragePath = $"{dataDirectory}/esrally"; // Used for large, shareable, reusable data + string rallyUserHomePath = $"/home/{user}"; // Used for user‑specific results and metadata + + if (!state.RallyConfigured) + { + this.StartRallyClient( + user, + targetHost, + port, + trackName, + rallySharedStoragePath, + rallyUserHomePath, + telemetryContext.Clone(), + cancellationToken); + + if (!cancellationToken.IsCancellationRequested) + { + state.RallyConfigured = true; + await this.StateManager.SaveStateAsync(nameof(ElasticsearchRallyState), state, cancellationToken); + } + } + + this.RunRallyClient( + user, + targetHost, + port, + trackName, + rallySharedStoragePath, + rallyUserHomePath, + telemetryContext.Clone(), + cancellationToken); + }); + + return; + } + + /// + /// Reads all lines from the specified report file and returns them as an array of strings. + /// + /// The full path to the report file to read. Cannot be null or an empty string. + /// An array of strings, each representing a line from the report file. The array will be empty if the file + /// contains no lines. + protected virtual string[] ReadReportLines(string reportPath) + { + return System.IO.File.ReadAllLines(reportPath); + } + + /// + /// Checks whether the specified server is available by attempting to connect to the given host and port. + /// + /// This method waits up to timeout seconds before performing the availability check. Override + /// this method to customize the server availability check logic in derived classes. + /// The context for telemetry and logging associated with this operation. + /// The DNS name or IP address of the server to check for availability. Cannot be null or empty. + /// The network port number on the target server to check. Must be a valid TCP port number. + /// The amount of time in milliseconds to wait before performing the availability check. Default is 30000 ms. + /// true if the server at the specified host and port is available; otherwise, false. + protected virtual bool CheckServerAvailable( + EventContext telemetryContext, + string targetHost, + int port, + int timeout = 30000) + { + Thread.Sleep(timeout); // wait for server to be available + + return this.RunCommandAsRoot(telemetryContext, "RallyUrlServerCall", $"curl {targetHost}:{port}"); + } + + private void StartRallyClient( + string user, + string targetHost, + int port, + string trackName, + string rallySharedStoragePath, + string rallyUserHomePath, + EventContext telemetryContext, + CancellationToken cancellationToken) + { + // install es rally + this.RunCommandAsRoot(telemetryContext, "RallyCheckPyhton3", $"python3 --version", true); + this.RunCommandAsRoot(telemetryContext, "RallyCheckPip3", $"pip3 --version", true); + + // using pipx to install esrally, prepare the environment and avoid dependency conflicts + this.RunCommandAsRoot(telemetryContext, "RallySetPixPathRoot", $"pipx ensurepath", true); + + this.RunCommandAsRoot(telemetryContext, "RallyInstall", $"pipx install esrally", true); + + this.RunCommandAsRoot(telemetryContext, "RallyMakeSharedStorage", $"mkdir -p {rallySharedStoragePath}"); + + this.RunCommandAsRoot(telemetryContext, "RallyChown", $"chown -R {user}:{user} {rallySharedStoragePath}", true); + this.RunCommandAsRoot(telemetryContext, "RallySharedStorageCheck", $"ls -ld {rallySharedStoragePath}", true); + this.RunCommandAsUser(telemetryContext, user, "RallyUserTouch", $"echo ok > {rallySharedStoragePath}/test.txt", true); + + this.RunCommandAsRoot(telemetryContext, "RallyChownUserHome", $"chown -R {user}:{user} {rallyUserHomePath}"); + + this.RunESRallyCommand(telemetryContext, user, rallyUserHomePath, rallySharedStoragePath, "RallyCheckEsrallyCheck", "--version"); + this.RunESRallyCommand(telemetryContext, user, rallyUserHomePath, rallySharedStoragePath, "RallyInfo", $"info --track={trackName}"); + this.RunESRallyCommand(telemetryContext, user, rallyUserHomePath, rallySharedStoragePath, "RallyListTracks", "list tracks"); + + // client environment is ready, now we can connect to the Elasticsearch server + + int tries = 0; + while (!this.CheckServerAvailable(telemetryContext, targetHost, port)) + { + if (tries++ > 10) + { + throw new WorkloadException( + $"ElasticSearch Rally Client could not reach the server at {targetHost} after multiple attempts.", + ErrorReason.WorkloadFailed); + } + + this.Logger.LogTraceMessage($"ElasticSearch Rally Client waiting for server {targetHost} to be available..."); + } + } + + private void RunRallyClient( + string user, + string targetHost, + int port, + string trackName, + string rallySharedStoragePath, + string rallyUserHomePath, + EventContext telemetryContext, + CancellationToken cancellationToken) + { + DateTime start = DateTime.Now; + string raceId = Guid.NewGuid().ToString(); + string reportPath = $"{rallySharedStoragePath}/report.csv"; + + string rallyCommand = string.Concat( + "race ", + $"--track={trackName} ", + $"--distribution-version={this.DistributionVersion} ", + $"--target-hosts={targetHost} ", + $"--race-id={raceId} ", + $"--target-hosts={targetHost}:{port} ", + $"--show-in-report=all ", // all, all-percentiles, available + $"--report-format=csv ", + $"--report-file={reportPath} ", + $"--pipeline=benchmark-only ", + $"--runtime-jdk=bundled"); + + if (this.RallyTestMode) + { + rallyCommand = string.Concat(rallyCommand, " --test-mode"); + } + + this.RunESRallyCommand(telemetryContext, user, rallyUserHomePath, rallySharedStoragePath, "RallyExecution", rallyCommand); + + this.RunESRallyCommand(telemetryContext, user, rallyUserHomePath, rallySharedStoragePath, "RallyListRaces", "list races"); + + // race.json is undocumented and not present in esrally 2.5.0 and later versions by default, so we cannot depend on it. + string resultsPath = $"{rallySharedStoragePath}/.rally/benchmarks/races/{raceId}/race.json"; + telemetryContext.AddContext("RallyResultsJsonPath", resultsPath); + telemetryContext.AddContext("RallyReportCsvPath", reportPath); + + if (!this.CheckFileExists(reportPath)) + { + throw new WorkloadException( + $"{this.TypeName}.RallyReportCsvMissing", + ErrorReason.WorkloadUnexpectedAnomaly); + } + else + { + try + { + string[] reportContents = this.ReadReportLines(reportPath); + if (reportContents.Length < 2) + { + this.Logger.LogMessage($"{this.TypeName}.RallyReportCsvInsufficientData", telemetryContext); + return; + } + + telemetryContext.AddContext("RallyReportCsvContents", reportContents.Take(5)); + this.Logger.LogMessage($"{this.TypeName}.RallyReportCsv", telemetryContext); + + this.CaptureMetrics(reportContents, rallyCommand, raceId, start, DateTime.Now, telemetryContext, cancellationToken); + } + catch (Exception ex) + { + throw new WorkloadException( + $"{this.TypeName}.RallyReportCsvFailed", + ex, + ErrorReason.WorkloadUnexpectedAnomaly); + } + } + } + + private void RunESRallyCommand(EventContext telemetryContext, string user, string rallyUserHomePath, string rallySharedStoragePath, string key, string esRallyCommand) + { + // hey points of this solution: + // - avoids dotfiles which are very cumbersome to deal with .net process + // - wrapper script quirks by calling python3 -m + // - inlines all required esrally environment arguments via env. + + var pipxBin = $"{rallyUserHomePath}/.local/bin"; + + // Build PATH deterministically (prepend pipx bin) + var basePath = Environment.GetEnvironmentVariable("PATH") ?? "/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"; + var childPath = $"{pipxBin}:{basePath}"; + + string shellCommand = string.Concat( + $"-u {user} -H ", // set user scope + "env ", // set environment arguments for current process session + $"HOME={rallyUserHomePath} ", // user storage + $"RALLY_HOME={rallySharedStoragePath} ", // shared storage + $"XDG_STATE_HOME={rallyUserHomePath}/.local/state ", + $"PATH={childPath} ", + "python3 -m pipx run esrally ", // esrally lives inside the pipx venv, not system Python. + esRallyCommand); + + this.RunCommandAsRoot(telemetryContext, key, shellCommand, true); + } + + private void CaptureMetrics(string[] reportContents, string rallyExecutionArguments, string raceId, DateTime startTime, DateTime exitTime, EventContext telemetryContext, CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + this.MetadataContract.AddForScenario( + "ElasticsearchRally", + rallyExecutionArguments, + toolVersion: null); + + this.MetadataContract.Apply(telemetryContext); + + if (reportContents.Length > 0) + { + try + { + IList metrics = ElasticsearchMetricReader.Read( + reportContents, + new Dictionary + { + ["elasticsearchVersion"] = this.DistributionVersion, + ["rallyTrack"] = this.TrackName, + ["raceId"] = raceId, + }); + + if (this.MetricFilters?.Any() == true) + { + metrics = metrics.FilterBy(this.MetricFilters).ToList(); + } + + this.Logger.LogMetrics( + toolName: "ElasticsearchRally", + scenarioName: this.MetricScenario ?? this.Scenario, + startTime, + exitTime, + metrics, + null, + scenarioArguments: rallyExecutionArguments, + this.Tags, + telemetryContext); + } + catch (Exception exc) + { + throw new WorkloadException( + $"Capture metrics failed.", + exc, + ErrorReason.InvalidResults); + } + } + } + } +} diff --git a/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyServerExecutor.cs b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyServerExecutor.cs new file mode 100644 index 0000000000..7a5687029d --- /dev/null +++ b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyServerExecutor.cs @@ -0,0 +1,185 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace CRC.VirtualClient.Actions +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.IO; + using System.Linq; + using System.Net; + using System.Threading; + using System.Threading.Tasks; + using global::VirtualClient; + using global::VirtualClient.Common; + using global::VirtualClient.Common.Telemetry; + using global::VirtualClient.Contracts; + using Microsoft.Extensions.DependencyInjection; + + /// + /// The Elasticsearch Rally Server workload executor. + /// + public class ElasticsearchRallyServerExecutor : ElasticsearchRallyBaseExecutor + { + /// + /// Initializes a new instance of the class. + /// + /// An enumeration of dependencies that can be used for dependency injection. + /// An enumeration of key-value pairs that can control the execution of the component. + public ElasticsearchRallyServerExecutor(IServiceCollection dependencies, IDictionary parameters = null) + : base(dependencies, parameters) + { + } + + /// + /// Initializes the environment for execution of the Rally workload. + /// + protected override async Task InitializeAsync(EventContext telemetryContext, CancellationToken cancellationToken) + { + await base.InitializeAsync(telemetryContext, cancellationToken) + .ConfigureAwait(false); + + await this.Logger.LogMessageAsync($"{this.TypeName}.ConfigureServer", telemetryContext.Clone(), async () => + { + ElasticsearchRallyState state = await this.StateManager.GetStateAsync(nameof(ElasticsearchRallyState), cancellationToken) + ?? new ElasticsearchRallyState(); + + if (!state.ElasticsearchStarted) + { + this.StartElasticSearch(telemetryContext); + + if (!cancellationToken.IsCancellationRequested) + { + state.ElasticsearchStarted = true; + await this.StateManager.SaveStateAsync(nameof(ElasticsearchRallyState), state, cancellationToken); + } + } + }); + } + + /// + /// Executes server side of workload. + /// + /// Provides context information that will be captured with telemetry events. + /// A token that can be used to cancel the operation. + protected override Task ExecuteAsync(EventContext telemetryContext, CancellationToken cancellationToken) + { + return this.Logger.LogMessageAsync($"{nameof(ElasticsearchRallyServerExecutor)}.ExecuteServer", telemetryContext, async () => + { + try + { + this.SetServerOnline(true); + + if (this.IsMultiRoleLayout()) + { + using (BackgroundOperations profiling = BackgroundOperations.BeginProfiling(this, cancellationToken)) + { + await this.WaitAsync(cancellationToken); + } + } + } + finally + { + this.SetServerOnline(false); + } + }); + } + + /// + /// Write all text to a file. + /// + /// The path to the file. + /// The content to write to the file. + protected virtual void WriteAllText(string path, string content) + { + File.WriteAllText(path, content); + } + + /// + /// Read all text from a file. + /// + /// The path to the file. + /// The content of the file. + protected virtual string ReadAllText(string path) + { + return File.ReadAllText(path); + } + + private void StartElasticSearch(EventContext telemetryContext) + { + string scriptsDirectory = this.PlatformSpecifics.GetScriptPath(this.PackageName.ToLower()); + int port = this.Port; + + this.RunCommandAsRoot(telemetryContext, "SetVmMaxMapCount", "sysctl -w vm.max_map_count=262144"); + + // make the change persistent + this.RunCommandScript(telemetryContext, "VmMaxMapCountPersist", "echo \"vm.max_map_count=262144\" | sudo tee /etc/sysctl.d/99-elasticsearch.conf"); + this.RunCommandAsRoot(telemetryContext, "VmMaxMapCountSysCtl", "sysctl --system"); + this.RunCommandAsRoot(telemetryContext, "VmMaxMapCountVerify", "sysctl vm.max_map_count"); + + // LimitMEMLOCKinfinity + this.RunCommandScript(telemetryContext, "LimitMEMLOCKinfinityMkdir", "sudo mkdir -p /etc/systemd/system/elasticsearch.service.d"); + this.RunCommandScript(telemetryContext, "LimitMEMLOCKinfinityPersist", "printf \"[Service]\nLimitMEMLOCK=infinity\nLimitMEMLOCKSoft=infinity\n\" | sudo tee /etc/systemd/system/elasticsearch.service.d/override.conf"); + + // install elasticsearch + // I tried VirtualClient profile script using LinuxPackageInstallation, but I got a reachability error from Juno deployment: Unable to locate package elasticsearch + // So, I followed Elasticsearch documentation here https://www.elastic.co/guide/en/elasticsearch/reference/current/deb.html + this.RunCommandScript(telemetryContext, "ElasticsearchImport", "curl -fsSL https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg"); + this.RunCommandScript(telemetryContext, "ElasticsearchAdd", "echo \"deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main\" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list"); + this.RunCommandAsRoot(telemetryContext, "ElasticsearchUpdate", "apt update"); + this.RunCommandAsRoot(telemetryContext, "ElasticsearchInstall", "apt install elasticsearch -y"); + + // create elasticsearch.yml + string elasticsearchPath = this.PlatformSpecifics.Combine(scriptsDirectory, "elasticsearch.ini"); + if (!this.CheckFileExists(elasticsearchPath)) + { + throw new WorkloadException( + $"The Elasticsearch configuration file (yml) could not be found at the expected path: {elasticsearchPath}", + ErrorReason.WorkloadUnexpectedAnomaly); + } + + telemetryContext.AddContext($"{this.TypeName}.{nameof(elasticsearchPath)}", elasticsearchPath); + this.Logger.LogMessage($"{this.TypeName}.ElasticsearchIniRead", telemetryContext); + string elasticsearchYmlContent = this.ReadAllText(elasticsearchPath); + elasticsearchYmlContent = elasticsearchYmlContent.Replace("$.parameters.port", port.ToString()); + + string elasticsearchPathYml = elasticsearchPath.Replace(".ini", ".yml"); + this.Logger.LogMessage($"{this.TypeName}.ElasticsearchYmlWrite", telemetryContext); + this.WriteAllText(elasticsearchPathYml, elasticsearchYmlContent); + + this.RunCommandAsRoot(telemetryContext, "ElasticsearchYmlCopy", $"cp {elasticsearchPathYml} /etc/elasticsearch/elasticsearch.yml"); + this.RunCommandAsRoot(telemetryContext, "ElasticsearchYml", $"tail -n 10000 /etc/elasticsearch/elasticsearch.yml"); + + // set limits.conf + string limitsConfPath = this.PlatformSpecifics.Combine(scriptsDirectory, "limits.ini"); + this.RunCommandAsRoot(telemetryContext, "ElasticsearchLimitsCopy", $"cp {limitsConfPath} /etc/security/limits.conf"); + this.RunCommandAsRoot(telemetryContext, "ElasticsearchLimits", $"tail -n 10000 /etc/security/limits.conf"); + + this.RunCommandAsRoot(telemetryContext, "ElasticsearchRemoveKeystore", "/usr/share/elasticsearch/bin/elasticsearch-keystore remove xpack.security.transport.ssl.keystore.secure_password"); + this.RunCommandAsRoot(telemetryContext, "ElasticsearchRemoveTruestore", "/usr/share/elasticsearch/bin/elasticsearch-keystore remove xpack.security.transport.ssl.truststore.secure_password"); + + // run elasticsearch + this.RunCommandAsRoot(telemetryContext, "ElasticsearchDaemonReexec", "systemctl daemon-reexec"); + this.RunCommandAsRoot(telemetryContext, "ElasticsearchEnable", "systemctl enable elasticsearch"); + bool ok = this.RunCommandAsRoot(telemetryContext, "ElasticsearchStart", "systemctl start elasticsearch.service"); + Thread.Sleep(30000); // wait for elasticsearch to start + + if (!ok) + { + this.RunCommandAsRoot(telemetryContext, "ElasticsearchRallyClusterLog", $"tail -n 10000 /var/log/elasticsearch/rally-cluster.log"); + + this.RunCommandAsRoot(telemetryContext, "ElasticsearchJournal", "journalctl -xeu elasticsearch.service"); + + throw new WorkloadException( + $"Elasticsearch failed to start.", + ErrorReason.WorkloadUnexpectedAnomaly); + } + + // verify elasticsearch is running + this.RunCommandAsRoot(telemetryContext, "ElasticsearchStatus", "systemctl status elasticsearch.service"); + this.RunCommandAsRoot(telemetryContext, "ElasticsearchSocket", "ss -lnt"); + this.RunCommandAsRoot(telemetryContext, "ElasticsearchUrlCall", $"curl localhost:{port}"); + } + } +} diff --git a/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyState.cs b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyState.cs new file mode 100644 index 0000000000..5efa295960 --- /dev/null +++ b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyState.cs @@ -0,0 +1,44 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace CRC.VirtualClient.Actions +{ + using System; + using System.Collections.Generic; + using global::VirtualClient.Common.Extensions; + using global::VirtualClient.Contracts; + + internal class ElasticsearchRallyState : State + { + public ElasticsearchRallyState(IDictionary properties = null) + : base(properties) + { + } + + public bool ElasticsearchStarted + { + get + { + return this.Properties.GetValue(nameof(ElasticsearchRallyState.ElasticsearchStarted), false); + } + + set + { + this.Properties[nameof(ElasticsearchRallyState.ElasticsearchStarted)] = value; + } + } + + public bool RallyConfigured + { + get + { + return this.Properties.GetValue(nameof(ElasticsearchRallyState.RallyConfigured), false); + } + + set + { + this.Properties[nameof(ElasticsearchRallyState.RallyConfigured)] = value; + } + } + } +} diff --git a/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/elasticsearch.ini b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/elasticsearch.ini new file mode 100644 index 0000000000..1cadcbe279 --- /dev/null +++ b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/elasticsearch.ini @@ -0,0 +1,122 @@ +# ======================== Elasticsearch Configuration ========================= +# +# NOTE: Elasticsearch comes with reasonable defaults for most settings. +# Before you set out to tweak and tune the configuration, make sure you +# understand what are you trying to accomplish and the consequences. +# +# The primary way of configuring a node is via this file. This template lists +# the most important settings you may want to configure for a production cluster. +# +# Please consult the documentation for further information on configuration options: +# https://www.elastic.co/guide/en/elasticsearch/reference/index.html +# +# ---------------------------------- Cluster ----------------------------------- +# +# Use a descriptive name for your cluster: +# +cluster.name: rally-cluster # Unique name for your cluster +# +# ------------------------------------ Node ------------------------------------ +# +# Use a descriptive name for the node: +# +node.name: node-1 # Unique name for this node +node.roles: [ master, data, ingest ] # Roles assigned to this node +# +# Add custom attributes to the node: +# +#node.attr.rack: r1 +# +# ----------------------------------- Paths ------------------------------------ +# +# Path to directory where to store the data (separate multiple locations by comma): +# +path.data: /var/lib/elasticsearch +# +# Path to log files: +# +path.logs: /var/log/elasticsearch +# +# ----------------------------------- Memory ----------------------------------- +# +# Lock the memory on startup: +# +bootstrap.memory_lock: true # Prevent swapping +# +# Make sure that the heap size is set to about half the memory available +# on the system and that the owner of the process is allowed to use this +# limit. +# +# Elasticsearch performs poorly when the system is swapping the memory. +# +# ---------------------------------- Network ----------------------------------- +# +# By default Elasticsearch is only accessible on localhost. Set a different +# address here to expose this node on the network: +# +network.host: 0.0.0.0 # Bind address +# +# By default Elasticsearch listens for HTTP traffic on the first free port it +# finds starting at 9200. Set a specific HTTP port here: +# +http.port: $.parameters.port +# +# For more information, consult the network module documentation. +# +# --------------------------------- Discovery ---------------------------------- +# +# Pass an initial list of hosts to perform discovery when this node is started: +# The default list of hosts is ["127.0.0.1", "[::1]"] +# +#discovery.seed_hosts: ["host1", "host2"] +# +# Bootstrap the cluster using an initial set of master-eligible nodes: +# +#cluster.initial_master_nodes: ["node-1", "node-2"] +discovery.type: single-node +# +# For more information, consult the discovery and cluster formation module documentation. +# +# ---------------------------------- Various ----------------------------------- +# +# Allow wildcard deletion of indices: +# +#action.destructive_requires_name: false + +#----------------------- BEGIN SECURITY AUTO CONFIGURATION ----------------------- +# +# The following settings, TLS certificates, and keys have been automatically +# generated to configure Elasticsearch security features on 25-12-2025 17:58:39 +# +# -------------------------------------------------------------------------------- + +# Enable security features +xpack.security.enabled: false +xpack.security.transport.ssl.enabled: false +xpack.security.http.ssl.enabled: false + + +# Enable encryption for HTTP API client connections, such as Kibana, Logstash, and Agents +# xpack.security.http.ssl: +# enabled: true +# keystore.path: certs/http.p12 + +# Enable encryption and mutual authentication between cluster nodes +# xpack.security.transport.ssl: +# enabled: true +# verification_mode: certificate +# keystore.path: certs/transport.p12 +# truststore.path: certs/transport.p12 +# Create a new cluster with the current node only +# Additional nodes can still join the cluster later +#cluster.initial_master_nodes: ["6be45737fab-1"] + +# Allow HTTP API connections from anywhere +# Connections are encrypted and require user authentication +http.host: 0.0.0.0 + +# Allow other nodes to join the cluster from anywhere +# Connections are encrypted and mutually authenticated +#transport.host: 0.0.0.0 + +#----------------------- END SECURITY AUTO CONFIGURATION ------------------------- \ No newline at end of file diff --git a/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/limits.ini b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/limits.ini new file mode 100644 index 0000000000..6762991fd7 --- /dev/null +++ b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/limits.ini @@ -0,0 +1,76 @@ +# /etc/security/limits.conf +# +#This file sets the resource limits for the users logged in via PAM. +#It does not affect resource limits of the system services. +# +#Also note that configuration files in /etc/security/limits.d directory, +#which are read in alphabetical order, override the settings in this +#file in case the domain is the same or more specific. +#That means, for example, that setting a limit for wildcard domain here +#can be overridden with a wildcard setting in a config file in the +#subdirectory, but a user specific setting here can be overridden only +#with a user specific setting in the subdirectory. +# +#Each line describes a limit for a user in the form: +# +# +# +#Where: +# can be: +# - a user name +# - a group name, with @group syntax +# - the wildcard *, for default entry +# - the wildcard %, can be also used with %group syntax, +# for maxlogin limit +# - NOTE: group and wildcard limits are not applied to root. +# To apply a limit to the root user, must be +# the literal username root. +# +# can have the two values: +# - \"soft\" for enforcing the soft limits +# - \"hard\" for enforcing hard limits +# +# can be one of the following: +# - core - limits the core file size (KB) +# - data - max data size (KB) +# - fsize - maximum filesize (KB) +# - memlock - max locked-in-memory address space (KB) +# - nofile - max number of open file descriptors +# - rss - max resident set size (KB) +# - stack - max stack size (KB) +# - cpu - max CPU time (MIN) +# - nproc - max number of processes +# - as - address space limit (KB) +# - maxlogins - max number of logins for this user +# - maxsyslogins - max number of logins on the system +# - priority - the priority to run user process with +# - locks - max number of file locks the user can hold +# - sigpending - max number of pending signals +# - msgqueue - max memory used by POSIX message queues (bytes) +# - nice - max nice priority allowed to raise to values: [-20, 19] +# - rtprio - max realtime priority +# - chroot - change root to directory (Debian-specific) +# +# +# + +#* soft core 0 +#root hard core 100000 +#* hard rss 10000 +#@student hard nproc 20 +#@faculty soft nproc 20 +#@faculty hard nproc 50 +#ftp hard nproc 0 +#ftp - chroot /ftp +#@student - maxlogins 4 + +* soft memlock unlimited +* hard memlock unlimited +root soft memlock unlimited +root hard memlock unlimited +elasticsearch soft memlock unlimited +elasticsearch hard memlock unlimited +junovmadmin soft memlock unlimited +junovmadmin hard memlock unlimited + +# End of file \ No newline at end of file diff --git a/src/VirtualClient/VirtualClient.Actions/VirtualClient.Actions.csproj b/src/VirtualClient/VirtualClient.Actions/VirtualClient.Actions.csproj index 038749f5b7..087ac55976 100644 --- a/src/VirtualClient/VirtualClient.Actions/VirtualClient.Actions.csproj +++ b/src/VirtualClient/VirtualClient.Actions/VirtualClient.Actions.csproj @@ -29,6 +29,7 @@ + diff --git a/src/VirtualClient/VirtualClient.Main/profiles/PERF-ELASTICSEARCH-RALLY.json b/src/VirtualClient/VirtualClient.Main/profiles/PERF-ELASTICSEARCH-RALLY.json new file mode 100644 index 0000000000..44e7ef5f7e --- /dev/null +++ b/src/VirtualClient/VirtualClient.Main/profiles/PERF-ELASTICSEARCH-RALLY.json @@ -0,0 +1,103 @@ +{ + "Description": "Elasticsearch Rally Workload", + "Metadata": { + "RecommendedMinimumExecutionTime": "24:00:00", + "SupportedPlatforms": "linux-x64,linux-arm64", + "SupportedOperatingSystems": "Debian,Ubuntu" + }, + "Parameters": { + "DiskFilter": "osdisk:false&biggestsize", + "DistributionVersion": "8.0.0", + "Port": "9200", + "RallyTestMode": false + }, + "Actions": [ + { + "Type": "ElasticsearchRallyServerExecutor", + "Parameters": { + "Scenario": "SetupElasticsearchRallyCluster", + "Port": "$.Parameters.Port", + "DiskFilter": "$.Parameters.DiskFilter", + "Role": "Server", + "PackageName": "elasticsearchrally" + } + }, + { + "Type": "ElasticsearchRallyClientExecutor", + "Parameters": { + "Scenario": "ExecuteGeoNamesBenchmark", + "TrackName": "geonames", + "DistributionVersion": "$.Parameters.DistributionVersion", + "Port": "$.Parameters.Port", + "DiskFilter": "$.Parameters.DiskFilter", + "RallyTestMode": "$.Parameters.RallyTestMode", + "Role": "Client" + } + }, + { + "Type": "ElasticsearchRallyClientExecutor", + "Parameters": { + "Scenario": "ExecutePMCBenchmark", + "TrackName": "pmc", + "DistributionVersion": "$.Parameters.DistributionVersion", + "Port": "$.Parameters.Port", + "DiskFilter": "$.Parameters.DiskFilter", + "RallyTestMode": "$.Parameters.RallyTestMode", + "Role": "Client" + } + }, + { + "Type": "ElasticsearchRallyClientExecutor", + "Parameters": { + "Scenario": "ExecuteGeopointBenchmark", + "TrackName": "geopoint", + "DistributionVersion": "$.Parameters.DistributionVersion", + "Port": "$.Parameters.Port", + "DiskFilter": "$.Parameters.DiskFilter", + "RallyTestMode": "$.Parameters.RallyTestMode", + "Role": "Client" + } + }, + { + "Type": "ElasticsearchRallyClientExecutor", + "Parameters": { + "Scenario": "ExecuteHTTPLogsBenchmark", + "TrackName": "http_logs", + "DistributionVersion": "$.Parameters.DistributionVersion", + "Port": "$.Parameters.Port", + "DiskFilter": "$.Parameters.DiskFilter", + "RallyTestMode": "$.Parameters.RallyTestMode", + "Role": "Client" + } + } + ], + "Dependencies": [ + { + "Type": "FormatDisks", + "Parameters": { + "Scenario": "FormatDisks" + } + }, + { + "Type": "MountDisks", + "Parameters": { + "Scenario": "CreateMountPoints" + } + }, + { + "Type": "LinuxPackageInstallation", + "Parameters": { + "Scenario": "InstallLinuxPackages", + "Packages": "python3,python3-pip,python3-venv,pipx,git,pbzip2", + "Role": "Client" + } + }, + { + "Type": "ApiServer", + "Parameters": { + "Scenario": "StartAPIServer", + "Role": "Server" + } + } + ] +} \ No newline at end of file From 5749592607c26cc9356e824ca3a86fea3d66147b Mon Sep 17 00:00:00 2001 From: "Samuel Filho (WIPRO LIMITED)" Date: Sat, 10 Jan 2026 03:36:32 -0300 Subject: [PATCH 2/3] Adding Windows support with parallel download component to speed Elasticsearch package download. New profile parameters to let the user choose Elasticsearch and Rally versions. Implemented the control to let wget or apt to be used for Linux, and WebRequest or parallel download for Windows. New unit tests were built. --- .../ElasticsearchRallyClientExecutorTests.cs | 3 +- .../ElasticsearchRallyServerExecutorTests.cs | 339 +++++++++++++- .../ParallelDownloadHandlerTests.cs | 430 ++++++++++++++++++ .../ElasticsearchRallyBaseExecutor.cs | 82 +++- .../ElasticsearchRallyClientExecutor.cs | 44 +- .../ElasticsearchRallyServerExecutor.cs | 264 +++++++++-- .../ParallelDownloadHandler.cs | 265 +++++++++++ .../profiles/PERF-ELASTICSEARCH-RALLY.json | 30 +- 8 files changed, 1394 insertions(+), 63 deletions(-) create mode 100644 src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ParallelDownloadHandlerTests.cs create mode 100644 src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ParallelDownloadHandler.cs diff --git a/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ElasticsearchRallyClientExecutorTests.cs b/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ElasticsearchRallyClientExecutorTests.cs index 32c1c51ae0..6b3766fd73 100644 --- a/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ElasticsearchRallyClientExecutorTests.cs +++ b/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ElasticsearchRallyClientExecutorTests.cs @@ -52,7 +52,8 @@ public void SetupTest() this.Parameters = new Dictionary() { { nameof(ElasticsearchRallyClientExecutor.DiskFilter), "osdisk:false&biggestsize" }, - { nameof(ElasticsearchRallyClientExecutor.DistributionVersion), "8.0.0" }, + { nameof(ElasticsearchRallyClientExecutor.ElasticsearchVersion), "9.2.3" }, + { nameof(ElasticsearchRallyClientExecutor.RallyVersion), "2.12.0" }, { nameof(ElasticsearchRallyClientExecutor.Port), "9200" }, { nameof(ElasticsearchRallyClientExecutor.RallyTestMode), true }, { nameof(ElasticsearchRallyClientExecutor.Scenario), "ExecuteGeoNamesBenchmark" }, diff --git a/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ElasticsearchRallyServerExecutorTests.cs b/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ElasticsearchRallyServerExecutorTests.cs index 0f96c8f4bd..96f8001530 100644 --- a/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ElasticsearchRallyServerExecutorTests.cs +++ b/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ElasticsearchRallyServerExecutorTests.cs @@ -6,14 +6,14 @@ namespace VirtualClient.Actions using System; using System.Collections.Generic; using System.IO; + using System.Linq; using System.Net.Http; - using System.Reflection; using System.Threading; using System.Threading.Tasks; using CRC.VirtualClient.Actions; + using Microsoft.CodeAnalysis; using Microsoft.Extensions.DependencyInjection; using Moq; - using Newtonsoft.Json.Linq; using NUnit.Framework; using Polly; using VirtualClient.Common.Telemetry; @@ -78,12 +78,16 @@ public void TestElasticsearchRallyServerExecutorInitializeYmlNotFound() } [Test] - public async Task TestElasticsearchRallyServerExecutorInitialize() + [TestCase(true)] + [TestCase(false)] + public async Task TestElasticsearchRallyServerExecutorInitialize(bool useWget) { SetupTest(); bool commandExecuted = false; + this.Parameters.Add("UseWgetForElasticsearhDownloadOnLinux", useWget); + using (TestElasticsearchRallyServerExecutor executor = new TestElasticsearchRallyServerExecutor(this.Dependencies, this.Parameters)) { executor.OnRunCommand = (command, arguments) => @@ -114,13 +118,306 @@ public async Task TestElasticsearchRallyServerExecutorExpectedRun() Assert.IsTrue(commandExecuted); } + [Test] + public void StartElasticsearchWin_ThrowsException_WhenDataDirectoryDoesNotExist() + { + this.Setup(PlatformID.Win32NT); + this.Parameters.Add("ElasticsearchVersion", "8.15.0"); + + this.Directory.Setup(d => d.Exists(It.IsAny())).Returns(false); + + using (var executor = new TestElasticsearchRallyServerExecutor(this.Dependencies, this.Parameters)) + { + executor.FileExists = true; + executor.DataDirectory = this.Combine("C:", "nonexistent"); + + var ex = Assert.ThrowsAsync(async () => + await executor.TestStartElasticsearchWin(EventContext.None, CancellationToken.None)); + + Assert.IsTrue(ex.Message.Contains("data directory could not be found")); + Assert.AreEqual(ErrorReason.WorkloadUnexpectedAnomaly, ex.Reason); + } + } + + [Test] + [TestCase("win-x64", "x86_64")] + [TestCase("win-arm64", "arm_64")] + public async Task StartElasticsearchWin_DownloadsCorrectArchitecture(string platformArchitecture, string expectedArch) + { + this.Setup(PlatformID.Win32NT, platformArchitecture.EndsWith("x64") ? System.Runtime.InteropServices.Architecture.X64 : System.Runtime.InteropServices.Architecture.Arm64); + this.Parameters.Add("ElasticsearchVersion", "8.15.0"); + + string actualDownloadUrl = null; + + using (var executor = new TestElasticsearchRallyServerExecutor(this.Dependencies, this.Parameters)) + { + executor.FileExists = true; + executor.DirectoryExists = true; + executor.DataDirectory = this.Combine("C:", "data"); + executor.TestPlatformArchitectureName = platformArchitecture; + executor.OnDownloadFile = (url, path, ct) => + { + actualDownloadUrl = url; + return Task.CompletedTask; + }; + + await executor.TestStartElasticsearchWin(EventContext.None, CancellationToken.None); + } + + Assert.IsNotNull(actualDownloadUrl); + Assert.IsTrue(actualDownloadUrl.Contains($"windows-{expectedArch}")); + Assert.IsTrue(actualDownloadUrl.Contains("elasticsearch-8.15.0")); + } + + [Test] + public async Task StartElasticsearchWin_ExtractsZipFile() + { + this.Setup(PlatformID.Win32NT); + this.Parameters.Add("ElasticsearchVersion", "8.15.0"); + + var extractCommandExecuted = false; + string extractCommand = null; + + using (var executor = new TestElasticsearchRallyServerExecutor(this.Dependencies, this.Parameters)) + { + executor.FileExists = true; + executor.DirectoryExists = true; + executor.DataDirectory = this.Combine("C:", "data"); + executor.OnRunCommand = (command, arguments) => + { + if (arguments.Contains("Expand-Archive")) + { + extractCommandExecuted = true; + extractCommand = arguments; + } + }; + + await executor.TestStartElasticsearchWin(EventContext.None, CancellationToken.None); + } + + Assert.IsTrue(extractCommandExecuted); + Assert.IsNotNull(extractCommand); + Assert.IsTrue(extractCommand.Contains("Expand-Archive")); + Assert.IsTrue(extractCommand.Contains("elasticsearch-8.15.0.zip")); + } + + [Test] + public async Task StartElasticsearchWin_CopiesElasticsearchYmlToConfigDirectory() + { + this.Setup(PlatformID.Win32NT); + this.Parameters.Add("ElasticsearchVersion", "8.15.0"); + + var ymlCopied = false; + string sourcePath = null; + string destinationPath = null; + + using (var executor = new TestElasticsearchRallyServerExecutor(this.Dependencies, this.Parameters)) + { + executor.FileExists = true; + executor.DirectoryExists = true; + executor.DataDirectory = this.Combine("C:", "data"); + executor.OnFileCopy = (source, dest, overwrite) => + { + sourcePath = source; + destinationPath = dest; + ymlCopied = true; + }; + + await executor.TestStartElasticsearchWin(EventContext.None, CancellationToken.None); + } + + Assert.IsTrue(ymlCopied); + Assert.IsNotNull(sourcePath); + Assert.IsNotNull(destinationPath); + Assert.IsTrue(sourcePath.Contains("elasticsearch.yml")); + Assert.IsTrue(destinationPath.Contains("config")); + Assert.IsTrue(destinationPath.Contains("elasticsearch.yml")); + } + + [Test] + public async Task StartElasticsearchWin_DisablesFirewall() + { + this.Setup(PlatformID.Win32NT); + this.Parameters.Add("ElasticsearchVersion", "8.15.0"); + + var firewallCommandExecuted = false; + string firewallCommand = null; + + using (var executor = new TestElasticsearchRallyServerExecutor(this.Dependencies, this.Parameters)) + { + executor.FileExists = true; + executor.DirectoryExists = true; + executor.DataDirectory = this.Combine("C:", "data"); + executor.OnRunCommand = (command, arguments) => + { + if (arguments.Contains("Set-NetFirewallProfile")) + { + firewallCommandExecuted = true; + firewallCommand = arguments; + } + }; + + await executor.TestStartElasticsearchWin(EventContext.None, CancellationToken.None); + } + + Assert.IsTrue(firewallCommandExecuted); + Assert.IsNotNull(firewallCommand); + Assert.IsTrue(firewallCommand.Contains("Set-NetFirewallProfile")); + Assert.IsTrue(firewallCommand.Contains("Enabled False")); + } + + [Test] + [TestCase(9200)] + [TestCase(9300)] + [TestCase(8080)] + public async Task StartElasticsearchWin_StartsElasticsearchWithCorrectPort(int port) + { + this.Setup(PlatformID.Win32NT); + this.Parameters[nameof(ElasticsearchRallyServerExecutor.Port)] = port.ToString(); + this.Parameters.Add("ElasticsearchVersion", "8.15.0"); + + var startCommandExecuted = false; + string startCommand = null; + + using (var executor = new TestElasticsearchRallyServerExecutor(this.Dependencies, this.Parameters)) + { + executor.FileExists = true; + executor.DirectoryExists = true; + executor.DataDirectory = this.Combine("C:", "data"); + executor.OnRunCommand = (command, arguments) => + { + if (arguments.Contains("elasticsearch.bat")) + { + startCommandExecuted = true; + startCommand = arguments; + } + }; + + await executor.TestStartElasticsearchWin(EventContext.None, CancellationToken.None); + } + + Assert.IsTrue(startCommandExecuted); + Assert.IsNotNull(startCommand); + Assert.IsTrue(startCommand.Contains("elasticsearch.bat")); + Assert.IsTrue(startCommand.Contains($"-E http.port={port}")); + } + + [Test] + public async Task StartElasticsearchWin_VerifiesElasticsearchIsRunning() + { + this.Setup(PlatformID.Win32NT); + this.Parameters.Add("ElasticsearchVersion", "8.15.0"); + + var pingCommandExecuted = false; + string pingCommand = null; + + using (var executor = new TestElasticsearchRallyServerExecutor(this.Dependencies, this.Parameters)) + { + executor.FileExists = true; + executor.DirectoryExists = true; + executor.DataDirectory = this.Combine("C:", "data"); + executor.OnRunCommand = (command, arguments) => + { + if (arguments.Contains("Invoke-WebRequest") && arguments.Contains("localhost:9200")) + { + pingCommandExecuted = true; + pingCommand = arguments; + } + }; + + await executor.TestStartElasticsearchWin(EventContext.None, CancellationToken.None); + } + + Assert.IsTrue(pingCommandExecuted); + Assert.IsNotNull(pingCommand); + Assert.IsTrue(pingCommand.Contains("Invoke-WebRequest")); + Assert.IsTrue(pingCommand.Contains("localhost:9200")); + } + + [Test] + [TestCase(true)] + [TestCase(false)] + public async Task StartElasticsearchWin_UsesCorrectDownloadMethod(bool useWebRequest) + { + this.Setup(PlatformID.Win32NT); + this.Parameters.Add("ElasticsearchVersion", "8.15.0"); + this.Parameters.Add("UseWebRequestForElasticsearchDownloadOnWindows", useWebRequest); + + var webRequestUsed = false; + var parallelDownloadUsed = false; + + using (var executor = new TestElasticsearchRallyServerExecutor(this.Dependencies, this.Parameters)) + { + executor.FileExists = true; + executor.DirectoryExists = true; + executor.DataDirectory = this.Combine("C:", "data"); + executor.OnRunCommand = (command, arguments) => + { + if (arguments.Contains("Invoke-WebRequest") && arguments.Contains("artifacts.elastic.co")) + { + webRequestUsed = true; + } + }; + executor.OnDownloadFile = (url, path, ct) => + { + parallelDownloadUsed = true; + return Task.CompletedTask; + }; + + await executor.TestStartElasticsearchWin(EventContext.None, CancellationToken.None); + } + + if (useWebRequest) + { + Assert.IsTrue(webRequestUsed); + Assert.IsFalse(parallelDownloadUsed); + } + else + { + Assert.IsFalse(webRequestUsed); + Assert.IsTrue(parallelDownloadUsed); + } + } + + [Test] + public async Task StartElasticsearchWin_AddsCorrectTelemetryContext() + { + this.Setup(PlatformID.Win32NT); + this.Parameters.Add("ElasticsearchVersion", "8.15.0"); + + var telemetryContext = new EventContext(Guid.NewGuid()); + + using (var executor = new TestElasticsearchRallyServerExecutor(this.Dependencies, this.Parameters)) + { + executor.FileExists = true; + executor.DirectoryExists = true; + executor.DataDirectory = this.Combine("C:", "data"); + + await executor.TestStartElasticsearchWin(telemetryContext, CancellationToken.None); + } + + Assert.IsTrue(telemetryContext.Properties.ContainsKey("elasticsearchVersion")); + Assert.IsTrue(telemetryContext.Properties.ContainsKey("port")); + Assert.IsTrue(telemetryContext.Properties.ContainsKey("platformArchitecture")); + Assert.IsTrue(telemetryContext.Properties.ContainsKey("distroVersion")); + } + private class TestElasticsearchRallyServerExecutor : ElasticsearchRallyServerExecutor { public Action OnRunCommand { get; set; } + public Action OnFileCopy { get; set; } + public Func OnDownloadFile { get; set; } public bool FileExists { get; set; } + public bool DirectoryExists { get; set; } + public string DataDirectory { get; set; } + public string TestPlatformArchitectureName { get; set; } + public TestElasticsearchRallyServerExecutor(IServiceCollection dependencies, IDictionary parameters) : base(dependencies, parameters) { + this.DataDirectory = "/data"; + this.PackageName = "elasticsearchrally"; + this.WaitForElasticsearchAvailabilityTimeout = 0; } public new Task InitializeAsync(EventContext context, CancellationToken cancellationToken) @@ -133,6 +430,20 @@ public TestElasticsearchRallyServerExecutor(IServiceCollection dependencies, IDi return base.ExecuteAsync(context, cancellationToken); } + public async Task TestStartElasticsearchWin(EventContext context, CancellationToken cancellationToken) + { + var method = typeof(ElasticsearchRallyServerExecutor).GetMethod( + "StartElasticsearchWin", + System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Instance); + + await (Task)method.Invoke(this, new object[] { context, cancellationToken }); + } + + protected override Task GetDataDirectoryAsync(CancellationToken cancellationToken) + { + return Task.FromResult(this.DataDirectory); + } + protected override bool RunCommand(string command, string arguments, out string output, out string error) { output = string.Empty; @@ -142,12 +453,21 @@ protected override bool RunCommand(string command, string arguments, out string return true; } + protected override void RunCommandWindowsScriptDetached(EventContext telemetryContext, string key, string script) + { + OnRunCommand?.Invoke(key, script); + } protected override bool CheckFileExists(string path) { return this.FileExists; } + protected override bool CheckDirectoryExists(string path) + { + return this.DirectoryExists; + } + protected override void WriteAllText(string path, string content) { @@ -157,7 +477,18 @@ protected override string ReadAllText(string path) { return "sample text"; } - } + protected override void FileCopy(string sourcePath, string destinationPath, bool overwrite) + { + OnFileCopy?.Invoke(sourcePath, destinationPath, overwrite); + } + + protected override Task ParallelDownloadFile(string url, string destinationPath, CancellationToken cancellationToken) + { + OnDownloadFile?.Invoke(url, destinationPath, cancellationToken); + + return Task.CompletedTask; + } + } } } diff --git a/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ParallelDownloadHandlerTests.cs b/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ParallelDownloadHandlerTests.cs new file mode 100644 index 0000000000..8e7a724f64 --- /dev/null +++ b/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ParallelDownloadHandlerTests.cs @@ -0,0 +1,430 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace VirtualClient.Actions +{ + using System; + using System.Collections.Generic; + using System.IO; + using System.Net; + using System.Net.Http; + using System.Threading; + using System.Threading.Tasks; + using CRC.VirtualClient.Actions; + using Moq; + using Moq.Protected; + using NUnit.Framework; + + [TestFixture] + [Category("Unit")] + public class ParallelDownloadHandlerTests : MockFixture + { + private MockParallelDownloadHandler mockParallelDownloadHandler; + private Mock mockHttpMessageHandler; + private string testUrl; + private string testDestinationPath; + + [SetUp] + public void SetupTest() + { + this.Setup(PlatformID.Unix); + + this.testUrl = "https://example.com/testfile.zip"; + this.testDestinationPath = this.Combine(this.GetPackagePath(), "testfile.zip"); + + this.mockHttpMessageHandler = new Mock(); + + this.mockParallelDownloadHandler = new MockParallelDownloadHandler() + { + HttpClient = new HttpClient(this.mockHttpMessageHandler.Object), + FileExists = false, + FileLength = 0 + }; + + this.File.Reset(); + this.File.Setup(f => f.Exists(It.IsAny())).Returns(false); + + this.Directory.Setup(d => d.Exists(It.IsAny())).Returns(true); + this.Directory.Setup(d => d.CreateDirectory(It.IsAny())).Returns(this.DirectoryInfo.Object); + this.Directory.Setup(d => d.Delete(It.IsAny(), It.IsAny())); + + this.FileSystem.SetupGet(fs => fs.File).Returns(this.File.Object); + this.FileSystem.SetupGet(fs => fs.Directory).Returns(this.Directory.Object); + } + + [Test] + public async Task DownloadFile_PerformsSingleThreadedDownload_WhenServerDoesNotSupportRangeRequests() + { + long fileSize = 1024 * 1024; // 1 MB + byte[] fileContent = new byte[fileSize]; + new Random().NextBytes(fileContent); + + // Setup HEAD response - no range support + var headResponse = new HttpResponseMessage(HttpStatusCode.OK); + headResponse.Content = new ByteArrayContent(Array.Empty()); + headResponse.Content.Headers.ContentLength = fileSize; + headResponse.Headers.AcceptRanges.Clear(); // No range support + + // Setup GET response + var getResponse = new HttpResponseMessage(HttpStatusCode.OK); + getResponse.Content = new ByteArrayContent(fileContent); + getResponse.Content.Headers.ContentLength = fileSize; + + int requestCount = 0; + this.mockHttpMessageHandler.Protected() + .Setup>( + "SendAsync", + ItExpr.IsAny(), + ItExpr.IsAny()) + .ReturnsAsync((HttpRequestMessage request, CancellationToken token) => + { + if (request.Method == HttpMethod.Head) + { + return headResponse; + } + + requestCount++; + return getResponse; + }); + + MemoryStream downloadedStream = new MemoryStream(); + this.FileStream.Setup(fs => fs.New(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns((string path, FileMode mode, FileAccess access, FileShare share, int bufferSize, bool useAsync) => + { + var stream = new ESMockFileSystemStream(); + return stream; + }); + + this.mockParallelDownloadHandler.FileExists = true; + this.mockParallelDownloadHandler.FileLength = fileSize; + + await ParallelDownloadHandler.DownloadFile( + this.testUrl, + this.testDestinationPath, + CancellationToken.None, + parallel: 8, + timeout: 30, + chunkMb: 10, + parallelDownloadHandler: this.mockParallelDownloadHandler); + + // Should use single-threaded download (1 GET request) + Assert.AreEqual(1, requestCount); + } + + [Test] + public async Task DownloadFile_PerformsParallelDownload_WhenServerSupportsRangeRequests() + { + long fileSize = 10 * 1024 * 1024; // 10 MB + byte[] fileContent = new byte[fileSize]; + new Random().NextBytes(fileContent); + + // Setup HEAD response with range support + var headResponse = new HttpResponseMessage(HttpStatusCode.OK); + headResponse.Content = new ByteArrayContent(Array.Empty()); + headResponse.Content.Headers.ContentLength = fileSize; + headResponse.Headers.AcceptRanges.Add("bytes"); + + var rangeRequests = new List(); + this.mockHttpMessageHandler.Protected() + .Setup>( + "SendAsync", + ItExpr.IsAny(), + ItExpr.IsAny()) + .ReturnsAsync((HttpRequestMessage request, CancellationToken token) => + { + if (request.Method == HttpMethod.Head) + { + return headResponse; + } + + // Track range requests + if (request.Headers.Range != null) + { + rangeRequests.Add(request.Headers.Range.ToString()); + + var range = request.Headers.Range.Ranges.GetEnumerator(); + range.MoveNext(); + long start = range.Current.From.Value; + long end = range.Current.To.Value; + int length = (int)(end - start + 1); + + var response = new HttpResponseMessage(HttpStatusCode.PartialContent); + response.Content = new ByteArrayContent(fileContent, (int)start, length); + return response; + } + + return new HttpResponseMessage(HttpStatusCode.BadRequest); + }); + + var tempDir = this.Combine(this.GetPackagePath(), $".testfile.zip.parts"); + this.Directory.Setup(d => d.Exists(tempDir)).Returns(true); + + var partFiles = new Dictionary(); + this.FileStream.Setup(fs => fs.New(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns((string path, FileMode mode, FileAccess access, FileShare share, int bufferSize, bool useAsync) => + { + if (path.Contains(".part")) + { + var stream = new ESMockFileSystemStream(); + partFiles[path] = stream; + return stream; + } + return new ESMockFileSystemStream(); + }); + + this.mockParallelDownloadHandler.FileExists = true; + this.mockParallelDownloadHandler.FileLength = fileSize; + + await ParallelDownloadHandler.DownloadFile( + this.testUrl, + this.testDestinationPath, + CancellationToken.None, + parallel: 2, + timeout: 30, + chunkMb: 5, + parallelDownloadHandler: this.mockParallelDownloadHandler); + + // Should have made multiple range requests + Assert.Greater(rangeRequests.Count, 1); + } + + [Test] + public async Task DownloadFile_HandlesCancellation_Gracefully() + { + var headResponse = new HttpResponseMessage(HttpStatusCode.OK); + headResponse.Content = new ByteArrayContent(Array.Empty()); + headResponse.Content.Headers.ContentLength = 1024; + headResponse.Headers.AcceptRanges.Clear(); + + this.mockHttpMessageHandler.Protected() + .Setup>( + "SendAsync", + ItExpr.IsAny(), + ItExpr.IsAny()) + .ReturnsAsync(headResponse); + + using (var cts = new CancellationTokenSource()) + { + cts.Cancel(); + + // Should not throw, just return + await ParallelDownloadHandler.DownloadFile( + this.testUrl, + this.testDestinationPath, + cts.Token, + parallel: 8, + timeout: 30, + chunkMb: 10, + parallelDownloadHandler: this.mockParallelDownloadHandler); + + Assert.Pass("Cancellation handled gracefully"); + } + } + + [Test] + public async Task DownloadFile_UsesDefaultParameters_WhenNotSpecified() + { + long fileSize = 1024; + var headResponse = new HttpResponseMessage(HttpStatusCode.OK); + headResponse.Content = new ByteArrayContent(Array.Empty()); + headResponse.Content.Headers.ContentLength = fileSize; + headResponse.Headers.AcceptRanges.Clear(); + + var getResponse = new HttpResponseMessage(HttpStatusCode.OK); + getResponse.Content = new ByteArrayContent(new byte[fileSize]); + + this.mockHttpMessageHandler.Protected() + .Setup>( + "SendAsync", + ItExpr.IsAny(), + ItExpr.IsAny()) + .ReturnsAsync((HttpRequestMessage request, CancellationToken token) => + { + return request.Method == HttpMethod.Head ? headResponse : getResponse; + }); + + this.FileStream.Setup(fs => fs.New(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns((string path, FileMode mode, FileAccess access, FileShare share, int bufferSize, bool useAsync) => + { + var stream = new ESMockFileSystemStream(); + return stream; + }); + + this.mockParallelDownloadHandler.FileExists = true; + this.mockParallelDownloadHandler.FileLength = fileSize; + + // Should use default parameters: parallel=8, timeout=30, chunkMb=10 + await ParallelDownloadHandler.DownloadFile( + this.testUrl, + this.testDestinationPath, + CancellationToken.None, + parallelDownloadHandler: this.mockParallelDownloadHandler); + Assert.Pass("Default parameters used successfully"); + } + + [Test] + public async Task DownloadFile_FallsBackToSingleThreaded_WhenContentLengthIsUnknown() + { + // Setup HEAD response with no content length + var headResponse = new HttpResponseMessage(HttpStatusCode.OK); + headResponse.Content = new ByteArrayContent(Array.Empty()); + headResponse.Headers.AcceptRanges.Add("bytes"); + + var getResponse = new HttpResponseMessage(HttpStatusCode.OK); + byte[] content = new byte[1024]; + getResponse.Content = new ByteArrayContent(content); + + int getRequestCount = 0; + this.mockHttpMessageHandler.Protected() + .Setup>( + "SendAsync", + ItExpr.IsAny(), + ItExpr.IsAny()) + .ReturnsAsync((HttpRequestMessage request, CancellationToken token) => + { + if (request.Method == HttpMethod.Head) + { + return headResponse; + } + + getRequestCount++; + return getResponse; + }); + + this.FileStream.Setup(fs => fs.New(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns((string path, FileMode mode, FileAccess access, FileShare share, int bufferSize, bool useAsync) => + { + var stream = new ESMockFileSystemStream(); + return stream; + }); + + this.mockParallelDownloadHandler.FileExists = true; + + await ParallelDownloadHandler.DownloadFile( + this.testUrl, + this.testDestinationPath, + CancellationToken.None, + parallel: 8, + parallelDownloadHandler: this.mockParallelDownloadHandler); + + // Should fall back to single-threaded download + Assert.AreEqual(1, getRequestCount); + } + + [Test] + public async Task DownloadFile_FallsBackToSingleThreaded_WhenParallelIsOne() + { + long fileSize = 10 * 1024 * 1024; + var headResponse = new HttpResponseMessage(HttpStatusCode.OK); + headResponse.Content = new ByteArrayContent(Array.Empty()); + headResponse.Content.Headers.ContentLength = fileSize; + headResponse.Headers.AcceptRanges.Add("bytes"); + + var getResponse = new HttpResponseMessage(HttpStatusCode.OK); + getResponse.Content = new ByteArrayContent(new byte[fileSize]); + + int getRequestCount = 0; + this.mockHttpMessageHandler.Protected() + .Setup>( + "SendAsync", + ItExpr.IsAny(), + ItExpr.IsAny()) + .ReturnsAsync((HttpRequestMessage request, CancellationToken token) => + { + if (request.Method == HttpMethod.Head) + { + return headResponse; + } + + getRequestCount++; + return getResponse; + }); + + this.FileStream.Setup(fs => fs.New(It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + .Returns((string path, FileMode mode, FileAccess access, FileShare share, int bufferSize, bool useAsync) => + { + var stream = new ESMockFileSystemStream(); + return stream; + }); + + this.mockParallelDownloadHandler.FileExists = true; + this.mockParallelDownloadHandler.FileLength = fileSize; + + await ParallelDownloadHandler.DownloadFile( + this.testUrl, + this.testDestinationPath, + CancellationToken.None, + parallel: 1, + parallelDownloadHandler: this.mockParallelDownloadHandler); + + // Should use single-threaded download even though server supports ranges + Assert.AreEqual(1, getRequestCount); + } + + [Test] + public void DownloadFile_ThrowsHttpRequestException_WhenHeadRequestFails() + { + this.mockHttpMessageHandler.Protected() + .Setup>( + "SendAsync", + ItExpr.IsAny(), + ItExpr.IsAny()) + .ReturnsAsync(new HttpResponseMessage(HttpStatusCode.NotFound)); + + Assert.ThrowsAsync(async () => + { + await ParallelDownloadHandler.DownloadFile( + this.testUrl, + this.testDestinationPath, + CancellationToken.None); + }); + } + + /// + /// Mock file system stream for Elasticsearch Rally download tests. + /// + private class ESMockFileSystemStream : InMemoryFileSystemStream + { + /// + /// Initializes a new instance of the class. + /// + public ESMockFileSystemStream() + : base() + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The underlying stream. + /// The file path. + /// Whether the stream is asynchronous. + public ESMockFileSystemStream(Stream stream, string path, bool isAsync) + : base(stream, path, isAsync) + { + } + } + + private class MockParallelDownloadHandler : ParallelDownloadHandler.IParallelDownloadHandler + { + public HttpClient? HttpClient { get; set; } + + public bool FileExists { get; set; } + + public long FileLength { get; set; } + + public FileStream CreateFileStream(string destinationPath) + { + return new MockFileStream(); + } + } + + private class MockFileStream : FileStream + { + public MockFileStream() + : base("mockfile.txt", FileMode.Create) + { + } + } + } +} \ No newline at end of file diff --git a/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyBaseExecutor.cs b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyBaseExecutor.cs index c5f6bd1b88..78f00586d6 100644 --- a/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyBaseExecutor.cs +++ b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyBaseExecutor.cs @@ -6,6 +6,7 @@ namespace CRC.VirtualClient.Actions using System; using System.Collections.Generic; using System.Diagnostics; + using System.IO; using System.Linq; using System.Net; using System.Threading; @@ -36,6 +37,17 @@ public ElasticsearchRallyBaseExecutor(IServiceCollection dependencies, IDictiona }; } + /// + /// The Elasticsearch Distribution Version. + /// + public string ElasticsearchVersion + { + get + { + return this.Parameters.GetValue(nameof(ElasticsearchRallyServerExecutor.ElasticsearchVersion), "9.2.3"); + } + } + /// /// Disk filter specified /// @@ -78,7 +90,7 @@ protected override async Task InitializeAsync(EventContext telemetryContext, Can if (!cancellationToken.IsCancellationRequested) { IApiClientManager clientManager = this.Dependencies.GetService(); - + ClientInstance instance = this.GetLayoutClientInstance(); string layoutIPAddress = instance.IPAddress; @@ -112,7 +124,7 @@ protected override async Task InitializeAsync(EventContext telemetryContext, Can /// /// /// - protected async Task GetDataDirectoryAsync(CancellationToken cancellationToken) + protected virtual async Task GetDataDirectoryAsync(CancellationToken cancellationToken) { string diskPath = string.Empty; @@ -148,7 +160,66 @@ protected async Task GetDataDirectoryAsync(CancellationToken cancellatio /// true if a file exists at the specified path; otherwise, false. protected virtual bool CheckFileExists(string path) { - return System.IO.File.Exists(path); + return File.Exists(path); + } + + /// + /// Determines whether the specified directory exists. + /// + /// The path to the directory to check. + /// true if the directory exists; otherwise, false. + protected virtual bool CheckDirectoryExists(string path) + { + return Directory.Exists(path); + } + + /// + /// Copies a file from the source path to the destination path. + /// + /// The path of the file to copy. + /// The destination path where the file should be copied. + /// Whether to overwrite the file if it already exists at the destination. + protected virtual void FileCopy(string sourcePath, string destinationPath, bool overwrite) + { + File.Copy(sourcePath, destinationPath, overwrite); + } + + /// + /// Runs a windows script command. + /// + /// + /// Task identifier + /// + /// + /// + protected bool RunCommandWindowsScript(EventContext telemetryContext, string key, string script, bool throwOnError = false) + { + bool ok = this.RunCommand("powershell.exe", BuildWindowsScript(script), out string output, out string error); + + this.HandleTelemetry(telemetryContext, key, script, throwOnError, ok, output, error); + + return ok; + } + + /// + /// Runs a windows script command without waiting for it to complete. + /// + /// + /// Task identifier + /// + protected virtual void RunCommandWindowsScriptDetached(EventContext telemetryContext, string key, string script) + { + telemetryContext.AddContext($"{this.TypeName}.{key}Command", $"cmd.exe {script}"); + this.Logger.LogMessage($"{this.TypeName}.{key}", telemetryContext); + + Process.Start(new ProcessStartInfo + { + FileName = "cmd.exe", + Arguments = script, + RedirectStandardOutput = false, + RedirectStandardError = false, + UseShellExecute = false + }); } /// @@ -249,6 +320,11 @@ protected virtual bool RunCommand(string command, string arguments, out string o } } + private static string BuildWindowsScript(string script) + { + return $"-NoProfile -ExecutionPolicy Bypass -Command \"{script}\""; + } + private static string BuildBashScript(string script) { return string.Concat("-lc \"", script.Replace("\"", "\\\""), "\""); diff --git a/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyClientExecutor.cs b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyClientExecutor.cs index 53e514ead2..3dee0a7fba 100644 --- a/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyClientExecutor.cs +++ b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyClientExecutor.cs @@ -32,13 +32,14 @@ public ElasticsearchRallyClientExecutor(IServiceCollection dependencies, IDictio } /// - /// The Elasticsearch Distribution Version. + /// The Rally Distribution Version. + /// If not specified, the Rally latest version will be used. /// - public string DistributionVersion + public string RallyVersion { get { - return this.Parameters.GetValue(nameof(ElasticsearchRallyClientExecutor.DistributionVersion), "8.0.0"); + return this.Parameters.GetValue(nameof(ElasticsearchRallyClientExecutor.RallyVersion)); } } @@ -92,7 +93,7 @@ await this.Logger.LogMessageAsync($"{this.TypeName}.ExecuteClient", telemetryCon } string user = this.PlatformSpecifics.GetLoggedInUser(); - int port = this.Port; + int port = this.Port; string trackName = this.TrackName; string dataDirectory = await this.GetDataDirectoryAsync(cancellationToken); @@ -108,7 +109,7 @@ await this.Logger.LogMessageAsync($"{this.TypeName}.ExecuteClient", telemetryCon trackName, rallySharedStoragePath, rallyUserHomePath, - telemetryContext.Clone(), + telemetryContext.Clone(), cancellationToken); if (!cancellationToken.IsCancellationRequested) @@ -151,13 +152,13 @@ protected virtual string[] ReadReportLines(string reportPath) /// The context for telemetry and logging associated with this operation. /// The DNS name or IP address of the server to check for availability. Cannot be null or empty. /// The network port number on the target server to check. Must be a valid TCP port number. - /// The amount of time in milliseconds to wait before performing the availability check. Default is 30000 ms. + /// The amount of time in milliseconds to wait before performing the availability check. Default is 60000 ms. /// true if the server at the specified host and port is available; otherwise, false. protected virtual bool CheckServerAvailable( EventContext telemetryContext, string targetHost, int port, - int timeout = 30000) + int timeout = 60000) { Thread.Sleep(timeout); // wait for server to be available @@ -171,7 +172,7 @@ private void StartRallyClient( string trackName, string rallySharedStoragePath, string rallyUserHomePath, - EventContext telemetryContext, + EventContext telemetryContext, CancellationToken cancellationToken) { // install es rally @@ -181,7 +182,13 @@ private void StartRallyClient( // using pipx to install esrally, prepare the environment and avoid dependency conflicts this.RunCommandAsRoot(telemetryContext, "RallySetPixPathRoot", $"pipx ensurepath", true); - this.RunCommandAsRoot(telemetryContext, "RallyInstall", $"pipx install esrally", true); + string esRallyInstallCommand = "pipx install esrally"; + if (!string.IsNullOrEmpty(this.RallyVersion)) + { + esRallyInstallCommand = $"{esRallyInstallCommand}=={this.RallyVersion}"; + } + + this.RunCommandAsRoot(telemetryContext, "RallyInstall", esRallyInstallCommand, true); this.RunCommandAsRoot(telemetryContext, "RallyMakeSharedStorage", $"mkdir -p {rallySharedStoragePath}"); @@ -200,15 +207,18 @@ private void StartRallyClient( int tries = 0; while (!this.CheckServerAvailable(telemetryContext, targetHost, port)) { - if (tries++ > 10) + int limit = 20; + if (tries++ >= limit) { throw new WorkloadException( - $"ElasticSearch Rally Client could not reach the server at {targetHost} after multiple attempts.", + $"Elasticsearch Rally Client could not reach the server at {targetHost} after {limit} attempts.", ErrorReason.WorkloadFailed); } - this.Logger.LogTraceMessage($"ElasticSearch Rally Client waiting for server {targetHost} to be available..."); + this.Logger.LogMessage($"{this.TypeName}.ElasticsearchServerConnectionAttempt-{tries}-of-{limit}", telemetryContext); } + + this.Logger.LogMessage($"{this.TypeName}.ElasticsearchServerIsReady", telemetryContext); } private void RunRallyClient( @@ -218,7 +228,7 @@ private void RunRallyClient( string trackName, string rallySharedStoragePath, string rallyUserHomePath, - EventContext telemetryContext, + EventContext telemetryContext, CancellationToken cancellationToken) { DateTime start = DateTime.Now; @@ -228,7 +238,6 @@ private void RunRallyClient( string rallyCommand = string.Concat( "race ", $"--track={trackName} ", - $"--distribution-version={this.DistributionVersion} ", $"--target-hosts={targetHost} ", $"--race-id={raceId} ", $"--target-hosts={targetHost}:{port} ", @@ -248,7 +257,7 @@ private void RunRallyClient( this.RunESRallyCommand(telemetryContext, user, rallyUserHomePath, rallySharedStoragePath, "RallyListRaces", "list races"); // race.json is undocumented and not present in esrally 2.5.0 and later versions by default, so we cannot depend on it. - string resultsPath = $"{rallySharedStoragePath}/.rally/benchmarks/races/{raceId}/race.json"; + string resultsPath = $"{rallySharedStoragePath}/.rally/benchmarks/races/{raceId}/race.json"; telemetryContext.AddContext("RallyResultsJsonPath", resultsPath); telemetryContext.AddContext("RallyReportCsvPath", reportPath); @@ -332,7 +341,8 @@ private void CaptureMetrics(string[] reportContents, string rallyExecutionArgume reportContents, new Dictionary { - ["elasticsearchVersion"] = this.DistributionVersion, + ["elasticsearchVersion"] = this.ElasticsearchVersion, + ["rallyVersion"] = this.RallyVersion ?? "latest", ["rallyTrack"] = this.TrackName, ["raceId"] = raceId, }); @@ -357,7 +367,7 @@ private void CaptureMetrics(string[] reportContents, string rallyExecutionArgume { throw new WorkloadException( $"Capture metrics failed.", - exc, + exc, ErrorReason.InvalidResults); } } diff --git a/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyServerExecutor.cs b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyServerExecutor.cs index 7a5687029d..c56189fe1b 100644 --- a/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyServerExecutor.cs +++ b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ElasticsearchRallyServerExecutor.cs @@ -5,17 +5,17 @@ namespace CRC.VirtualClient.Actions { using System; using System.Collections.Generic; - using System.Diagnostics; using System.IO; + using System.IO.Abstractions; using System.Linq; - using System.Net; using System.Threading; using System.Threading.Tasks; using global::VirtualClient; - using global::VirtualClient.Common; + using global::VirtualClient.Common.Extensions; using global::VirtualClient.Common.Telemetry; using global::VirtualClient.Contracts; using Microsoft.Extensions.DependencyInjection; + using static CRC.VirtualClient.Actions.ParallelDownloadHandler; /// /// The Elasticsearch Rally Server workload executor. @@ -32,6 +32,29 @@ public ElasticsearchRallyServerExecutor(IServiceCollection dependencies, IDictio { } + /// + /// This flag enables the use of Wget specifically on Linux systems instead of the + /// default method which is Apt installation. + /// + /// Wget is taking too long downloading in VMs deployed by Juno. + /// Use this flag to switch to Wget if Apt installation is failing or you need a specific Elasticsearch version. + /// + public bool UseWgetForElasticsearhDownloadOnLinux => this.Parameters.GetValue(nameof(ElasticsearchRallyServerExecutor.UseWgetForElasticsearhDownloadOnLinux), false); + + /// + /// Gets a value indicating whether the download of Elasticsearch on Windows uses the WebRequest API instead of the .Net HttpClient + /// the default method. + /// + /// Use this property to control the download mechanism for Elasticsearch on Windows + /// platforms. This may be necessary for compatibility with certain network environments or when the default + /// method is unavailable. + public bool UseWebRequestForElasticsearchDownloadOnWindows => this.Parameters.GetValue(nameof(ElasticsearchRallyServerExecutor.UseWebRequestForElasticsearchDownloadOnWindows), false); + + /// + /// The timeout duration (in milliseconds) to wait for Elasticsearch to become available after starting. + /// + protected int WaitForElasticsearchAvailabilityTimeout { get; set; } = 30000; + /// /// Initializes the environment for execution of the Rally workload. /// @@ -47,7 +70,7 @@ await this.Logger.LogMessageAsync($"{this.TypeName}.ConfigureServer", telemetryC if (!state.ElasticsearchStarted) { - this.StartElasticSearch(telemetryContext); + await this.StartElasticsearch(telemetryContext, cancellationToken); if (!cancellationToken.IsCancellationRequested) { @@ -106,11 +129,42 @@ protected virtual string ReadAllText(string path) return File.ReadAllText(path); } - private void StartElasticSearch(EventContext telemetryContext) + /// + /// Downloads a file in parallel. + /// + /// The URL of the file to download. + /// The destination path where the file will be saved. + /// A token to cancel the operation. + protected virtual async Task ParallelDownloadFile( + string url, + string destinationPath, + CancellationToken cancellationToken) + { + await ParallelDownloadHandler.DownloadFile(url, destinationPath, cancellationToken); + + return; + } + + private async Task StartElasticsearch(EventContext telemetryContext, CancellationToken cancellationToken) + { + if (this.Platform == PlatformID.Unix) + { + this.StartElasticsearchLinux(telemetryContext); + } + else + { + await this.StartElasticsearchWin(telemetryContext, cancellationToken); + } + } + + private void StartElasticsearchLinux(EventContext telemetryContext) { string scriptsDirectory = this.PlatformSpecifics.GetScriptPath(this.PackageName.ToLower()); int port = this.Port; + telemetryContext.AddContext(nameof(scriptsDirectory), scriptsDirectory); + telemetryContext.AddContext(nameof(port), port); + this.RunCommandAsRoot(telemetryContext, "SetVmMaxMapCount", "sysctl -w vm.max_map_count=262144"); // make the change persistent @@ -122,31 +176,11 @@ private void StartElasticSearch(EventContext telemetryContext) this.RunCommandScript(telemetryContext, "LimitMEMLOCKinfinityMkdir", "sudo mkdir -p /etc/systemd/system/elasticsearch.service.d"); this.RunCommandScript(telemetryContext, "LimitMEMLOCKinfinityPersist", "printf \"[Service]\nLimitMEMLOCK=infinity\nLimitMEMLOCKSoft=infinity\n\" | sudo tee /etc/systemd/system/elasticsearch.service.d/override.conf"); - // install elasticsearch - // I tried VirtualClient profile script using LinuxPackageInstallation, but I got a reachability error from Juno deployment: Unable to locate package elasticsearch - // So, I followed Elasticsearch documentation here https://www.elastic.co/guide/en/elasticsearch/reference/current/deb.html - this.RunCommandScript(telemetryContext, "ElasticsearchImport", "curl -fsSL https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg"); - this.RunCommandScript(telemetryContext, "ElasticsearchAdd", "echo \"deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main\" | sudo tee /etc/apt/sources.list.d/elastic-8.x.list"); - this.RunCommandAsRoot(telemetryContext, "ElasticsearchUpdate", "apt update"); - this.RunCommandAsRoot(telemetryContext, "ElasticsearchInstall", "apt install elasticsearch -y"); + // download and install elasticsearch + this.InstallElasticsearchLinux(telemetryContext, scriptsDirectory); // create elasticsearch.yml - string elasticsearchPath = this.PlatformSpecifics.Combine(scriptsDirectory, "elasticsearch.ini"); - if (!this.CheckFileExists(elasticsearchPath)) - { - throw new WorkloadException( - $"The Elasticsearch configuration file (yml) could not be found at the expected path: {elasticsearchPath}", - ErrorReason.WorkloadUnexpectedAnomaly); - } - - telemetryContext.AddContext($"{this.TypeName}.{nameof(elasticsearchPath)}", elasticsearchPath); - this.Logger.LogMessage($"{this.TypeName}.ElasticsearchIniRead", telemetryContext); - string elasticsearchYmlContent = this.ReadAllText(elasticsearchPath); - elasticsearchYmlContent = elasticsearchYmlContent.Replace("$.parameters.port", port.ToString()); - - string elasticsearchPathYml = elasticsearchPath.Replace(".ini", ".yml"); - this.Logger.LogMessage($"{this.TypeName}.ElasticsearchYmlWrite", telemetryContext); - this.WriteAllText(elasticsearchPathYml, elasticsearchYmlContent); + string elasticsearchPathYml = this.CreateElasticsearchYml(telemetryContext, scriptsDirectory, port); this.RunCommandAsRoot(telemetryContext, "ElasticsearchYmlCopy", $"cp {elasticsearchPathYml} /etc/elasticsearch/elasticsearch.yml"); this.RunCommandAsRoot(telemetryContext, "ElasticsearchYml", $"tail -n 10000 /etc/elasticsearch/elasticsearch.yml"); @@ -181,5 +215,179 @@ private void StartElasticSearch(EventContext telemetryContext) this.RunCommandAsRoot(telemetryContext, "ElasticsearchSocket", "ss -lnt"); this.RunCommandAsRoot(telemetryContext, "ElasticsearchUrlCall", $"curl localhost:{port}"); } + + private void InstallElasticsearchLinux(EventContext telemetryContext, string scriptsDirectory) + { + // VirtualClient profile script using LinuxPackageInstallation is throwing a reachability error from Juno deployment: Unable to locate package elasticsearch + // manual installation is not working via wget in VMs deployed by Juno + + if (this.UseWgetForElasticsearhDownloadOnLinux) + { + this.InstallElasticsearchLinuxUsingWget(telemetryContext, scriptsDirectory); + } + else + { + this.InstallElasticsearchLinuxUsingApt(telemetryContext); + } + } + + private void InstallElasticsearchLinuxUsingApt(EventContext telemetryContext) + { + string elasticsearchMajorVersionKey = $"{this.ElasticsearchVersion.Substring(0, 1)}.x"; + telemetryContext.AddContext(nameof(elasticsearchMajorVersionKey), elasticsearchMajorVersionKey); + + // Elasticsearch documentation here https://www.elastic.co/guide/en/elasticsearch/reference/current/deb.html + this.RunCommandScript(telemetryContext, "ElasticsearchImport", "curl -fsSL https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg"); + this.RunCommandScript(telemetryContext, "ElasticsearchAdd", $"echo \"deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/{elasticsearchMajorVersionKey}/apt stable main\" | sudo tee /etc/apt/sources.list.d/elastic-{elasticsearchMajorVersionKey}.list"); + this.RunCommandAsRoot(telemetryContext, "ElasticsearchUpdate", "apt update"); + this.RunCommandAsRoot(telemetryContext, "ElasticsearchInstall", "apt install elasticsearch -y"); + } + + private void InstallElasticsearchLinuxUsingWget(EventContext telemetryContext, string scriptsDirectory) + { + // manual installation is not working via wget in VMs deployed by Juno + + string elasticsearchVersion = this.ElasticsearchVersion; + string platformArchitecture = this.PlatformArchitectureName; + string architecture = platformArchitecture.EndsWith("arm64") ? "arm64" : "amd64"; + string distroVersion = $"{elasticsearchVersion}-{architecture}"; + string downloadCommand = $"wget --no-check-certificate -t=10 --connect-timeout=30 --dns-timeout=30 -P {scriptsDirectory} https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-{distroVersion}.deb"; + + telemetryContext.AddContext(nameof(elasticsearchVersion), elasticsearchVersion); + telemetryContext.AddContext(nameof(platformArchitecture), platformArchitecture); + telemetryContext.AddContext(nameof(distroVersion), distroVersion); + telemetryContext.AddContext(nameof(downloadCommand), downloadCommand); + + this.Logger.LogMessage($"{this.TypeName}.ElasticsearchDownloadStart", telemetryContext); + + // I followed Elasticsearch documentation here https://www.elastic.co/docs/deploy-manage/deploy/self-managed/install-elasticsearch-with-debian-package + this.RunCommandAsRoot(telemetryContext, "ElasticsearchDownload", downloadCommand, true); + this.RunCommandAsRoot(telemetryContext, "ElasticsearchDownloadSHA", $"{downloadCommand}.sha512", true); + this.RunCommandScript(telemetryContext, "ElasticsearchCheckSHA", $"cd {scriptsDirectory} && shasum -a 512 -c elasticsearch-{distroVersion}.sha512", true); + this.RunCommandAsRoot(telemetryContext, "ElasticsearchInstall", $"dpkg -i {scriptsDirectory}/elasticsearch-{distroVersion}", true); + } + + private string CreateElasticsearchYml(EventContext telemetryContext, string scriptsDirectory, int port) + { + string elasticsearchPath = this.PlatformSpecifics.Combine(scriptsDirectory, "elasticsearch.ini"); + if (!this.CheckFileExists(elasticsearchPath)) + { + throw new WorkloadException( + $"The Elasticsearch configuration file (yml) could not be found at the expected path: {elasticsearchPath}", + ErrorReason.WorkloadUnexpectedAnomaly); + } + + telemetryContext.AddContext($"{this.TypeName}.{nameof(elasticsearchPath)}", elasticsearchPath); + this.Logger.LogMessage($"{this.TypeName}.ElasticsearchIniRead", telemetryContext); + string elasticsearchYmlContent = this.ReadAllText(elasticsearchPath); + elasticsearchYmlContent = elasticsearchYmlContent.Replace("$.parameters.port", port.ToString()); + + string elasticsearchPathYml = elasticsearchPath.Replace(".ini", ".yml"); + this.Logger.LogMessage($"{this.TypeName}.ElasticsearchYmlWrite", telemetryContext); + this.WriteAllText(elasticsearchPathYml, elasticsearchYmlContent); + + return elasticsearchPathYml; + } + + private async Task StartElasticsearchWin(EventContext telemetryContext, CancellationToken cancellationToken) + { + int port = this.Port; + string elasticsearchVersion = this.ElasticsearchVersion; + telemetryContext.AddContext(nameof(elasticsearchVersion), elasticsearchVersion); + telemetryContext.AddContext(nameof(port), port); + + // create install directory + string installDir = await this.GetDataDirectoryAsync(cancellationToken); + + if (!this.CheckDirectoryExists(installDir)) + { + throw new WorkloadException( + $"The Elasticsearch data directory could not be found at the expected path: {installDir}", + ErrorReason.WorkloadUnexpectedAnomaly); + } + + installDir = System.IO.Path.Combine(installDir, "elasticsearch"); + Directory.CreateDirectory(installDir); + + // download elasticsearch + string elasticsearchBase = $"elasticsearch-{elasticsearchVersion}"; + string platformArchitecture = this.PlatformArchitectureName; + string architecture = platformArchitecture.EndsWith("arm64") ? "arm_64" : "x86_64"; + string distroVersion = $"{elasticsearchBase}-windows-{architecture}.zip"; + telemetryContext.AddContext(nameof(elasticsearchVersion), elasticsearchVersion); + telemetryContext.AddContext(nameof(platformArchitecture), platformArchitecture); + telemetryContext.AddContext(nameof(distroVersion), distroVersion); + + string downloadUrl = $"https://artifacts.elastic.co/downloads/elasticsearch/{distroVersion}"; + string zipPath = System.IO.Path.Combine(installDir, $"{elasticsearchBase}.zip"); + + await this.DownloadFileAsync(telemetryContext, downloadUrl, zipPath, cancellationToken); + + // extract + string psExtract = $@" +if (-not (Test-Path '{installDir}')) {{ New-Item -ItemType Directory -Path '{installDir}' | Out-Null; }} +Expand-Archive -Path '{zipPath}' -DestinationPath '{installDir}' -Force; +"; + this.RunCommandWindowsScript(telemetryContext, "ElasticsearchExtract", psExtract); + + string dirContents = string.Join(';', Directory.GetDirectories(installDir).Select(x => x.Split('\\').Last())); + telemetryContext.AddContext($"{this.TypeName}.{nameof(dirContents)}", dirContents); + + string homeDir = System.IO.Path.Combine(installDir, elasticsearchBase); + telemetryContext.AddContext($"{this.TypeName}.{nameof(homeDir)}", homeDir); + + // create elasticsearch.yml + string scriptsDirectory = this.PlatformSpecifics.GetScriptPath(this.PackageName.ToLower()); + string elasticsearchPathYml = this.CreateElasticsearchYml(telemetryContext, scriptsDirectory, port); + + this.FileCopy(elasticsearchPathYml, Path.Combine(homeDir, "config", "elasticsearch.yml"), true); + + // open firewall + // string psFirewall = $"New-NetFirewallRule -DisplayName ElasticsearchInbound -Direction Inbound -Protocol TCP -LocalPort {port} -Action Allow -Profile Any"; + string psFirewall = "Set-NetFirewallProfile -Profile Domain,Public,Private -Enabled False"; + this.RunCommandWindowsScript(telemetryContext, "ElasticsearchOpenFirewall", psFirewall); + + // start detached + string args = $"-E http.port={port}"; + string elasticBat = Path.Combine(homeDir, "bin", "elasticsearch.bat"); + string startDetached = $"/c start {elasticBat} {args}"; + + this.RunCommandWindowsScriptDetached(telemetryContext, "ElasticsearchStart", startDetached); + + await Task.Delay(this.WaitForElasticsearchAvailabilityTimeout); // wait for elasticsearch to start + + // verify elasticsearch is running + + string psPing = @" +try { + $r = Invoke-WebRequest -UseBasicParsing -Uri 'http://localhost:" + port + @"' -TimeoutSec 15 + Write-Output ($r.StatusCode) +} catch { + Write-Output ('ERROR: ' + $_.Exception.Message) +} +"; + this.RunCommandWindowsScript(telemetryContext, "ElasticsearchPing", psPing); + } + + private async Task DownloadFileAsync(EventContext telemetryContext, string downloadUrl, string zipPath, CancellationToken cancellationToken) + { + telemetryContext.AddContext(nameof(downloadUrl), downloadUrl); + telemetryContext.AddContext(nameof(zipPath), zipPath); + await this.Logger.LogMessageAsync($"{this.TypeName}.DownloadFile", telemetryContext.Clone(), async () => + { + // WebRequest is taking too long to download the file, using parallel download handler as default. + if (this.UseWebRequestForElasticsearchDownloadOnWindows) + { + this.RunCommandWindowsScript(telemetryContext, "ElasticsearchDownload", $"Invoke-WebRequest -Uri '{downloadUrl}' -OutFile '{zipPath}' -UseBasicParsing"); + } + else + { + // .Net HttpClient based parallel download + await this.ParallelDownloadFile(downloadUrl, zipPath, cancellationToken); + } + }); + + return; + } } } diff --git a/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ParallelDownloadHandler.cs b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ParallelDownloadHandler.cs new file mode 100644 index 0000000000..eb3baa7a64 --- /dev/null +++ b/src/VirtualClient/VirtualClient.Actions/ElasticsearchRally/ParallelDownloadHandler.cs @@ -0,0 +1,265 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace CRC.VirtualClient.Actions +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.IO; + using System.IO.Abstractions; + using System.Linq; + using System.Net; + using System.Net.Http; + using System.Net.Http.Headers; + using System.Threading; + using System.Threading.Tasks; + + internal class ParallelDownloadHandler + { + /// + /// Defines an interface for handling parallel file downloads, providing access to HTTP and file system + /// operations. + /// + /// + /// Implementations of this interface provide the necessary HTTP client and file system abstractions + /// to support parallel downloading of files. + /// Unit test implementations can mock this interface to simulate different download scenarios. + /// + public interface IParallelDownloadHandler + { + public HttpClient? HttpClient { get; set; } + + public bool FileExists { get; set; } + + public long FileLength { get; set; } + + FileStream CreateFileStream(string destinationPath); + } + + /// + /// Downloads a file from the specified URL to the destination path with optional parallel download support. + /// Automatically detects if the server supports range requests and switches between parallel and single-threaded download modes. + /// + /// The URL of the file to download. + /// The local file path where the downloaded file will be saved. + /// A cancellation token to cancel the download operation. + /// The maximum number of parallel connections to use for downloading. Default is 8. Only applies if the server supports range requests. + /// The HTTP request timeout in seconds. Default is 30 seconds. + /// The size of each download chunk in megabytes when using parallel download. Default is 10 MB. + /// An optional parallel download handler instance to use for the download. If not provided, a new HttpClient will be created. + /// A task representing the asynchronous download operation. + public static async Task DownloadFile( + string url, + string destinationPath, + CancellationToken cancellationToken, + int parallel = 8, + int timeout = 30, + int chunkMb = 10, + IParallelDownloadHandler? parallelDownloadHandler = null) + { + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + + var sw = Stopwatch.StartNew(); + try + { + HttpClient? httpClient = parallelDownloadHandler?.HttpClient; + HttpClient http; + + if (httpClient == null) + { + // ---- Build a tuned SocketsHttpHandler ---- + var handler = new SocketsHttpHandler + { + AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate, // save bandwidth + MaxConnectionsPerServer = Math.Max(1, parallel), // lift per-host concurrency for ranges + AllowAutoRedirect = false, + PooledConnectionLifetime = TimeSpan.FromMinutes(5) + }; + http = new HttpClient(handler) { Timeout = TimeSpan.FromSeconds(timeout) }; + } + else + { + http = httpClient; + } + + // HEAD probe + var headReq = new HttpRequestMessage(HttpMethod.Head, url); + using var head = await http.SendAsync(headReq, HttpCompletionOption.ResponseHeadersRead, cts.Token); + head.EnsureSuccessStatusCode(); + + long? length = head.Content.Headers.ContentLength; + bool supportsRanges = head.Headers.AcceptRanges.Contains("bytes"); + string serverName = head.Headers.Server?.ToString() ?? string.Empty; + + bool fileExists; + long existingLength; + + if (parallelDownloadHandler == null) + { + fileExists = File.Exists(destinationPath); + existingLength = fileExists ? new FileInfo(destinationPath).Length : 0; + } + else + { + fileExists = parallelDownloadHandler.FileExists; + existingLength = fileExists ? parallelDownloadHandler.FileLength : 0; + } + + // Decide parallel vs single + if (supportsRanges && length > 0 && parallel > 1) + { + int chunkSize = Math.Max(1, chunkMb) * 1024 * 1024; + await DownloadParallelAsync(http, url, destinationPath, length.Value, chunkSize, parallel, cts.Token); + } + else + { + await DownloadSingleAsync(http, url, destinationPath, cts.Token, parallelDownloadHandler); + } + } + catch (OperationCanceledException) + { + return; + } + } + + private static async Task DownloadSingleAsync( + HttpClient http, + string url, + string destinationPath, + CancellationToken ct, + IParallelDownloadHandler? parallelDownloadHandler) + { + using var req = new HttpRequestMessage(HttpMethod.Get, url); + + using var resp = await http.SendAsync(req, HttpCompletionOption.ResponseHeadersRead, ct); + resp.EnsureSuccessStatusCode(); + + var total = resp.Content.Headers.ContentLength ?? 0; + using var src = await resp.Content.ReadAsStreamAsync(ct); + + using FileStream dst = + parallelDownloadHandler == null ? + new FileStream(destinationPath, FileMode.Create, FileAccess.Write, FileShare.None, 1 << 20, useAsync: true) : + parallelDownloadHandler.CreateFileStream(destinationPath); + + var buffer = new byte[1 << 20]; // 1MB buffer + long written = 0; + int read; + + while ((read = await src.ReadAsync(buffer.AsMemory(0, buffer.Length), ct)) > 0) + { + await dst.WriteAsync(buffer.AsMemory(0, read), ct); + written += read; + } + } + + private static async Task DownloadParallelAsync( + HttpClient http, + string url, + string destinationPath, + long totalLength, + int chunkSize, + int parallel, + CancellationToken ct) + { + var ranges = BuildRanges(totalLength, chunkSize); + + // Create temp part files + var tempDir = Path.Combine(Path.GetDirectoryName(Path.GetFullPath(destinationPath)) !, $".{Path.GetFileName(destinationPath)}.parts"); + Directory.CreateDirectory(tempDir); + + object gate = new (); + + var throttler = new SemaphoreSlim(parallel); + var tasks = ranges.Select(async (r, i) => + { + await throttler.WaitAsync(ct); + try + { + var partPath = Path.Combine(tempDir, $"{i:D8}.part"); + var success = await DownloadRangeWithRetryAsync(http, url, r, partPath, ct, progress: bytes => { /* in progress */ }); + // if (!success) throw new Exception($"Failed to download range {r.start}-{r.end}"); + } + finally + { + throttler.Release(); + } + }); + + await Task.WhenAll(tasks); + + // wrap up + using (var dst = new FileStream(destinationPath, FileMode.Create, FileAccess.Write, FileShare.None, 1 << 20, useAsync: true)) + { + for (int i = 0; i < ranges.Count; i++) + { + var part = Path.Combine(tempDir, $"{i:D8}.part"); + using var src = new FileStream(part, FileMode.Open, FileAccess.Read, FileShare.Read, 1 << 20, useAsync: true); + await src.CopyToAsync(dst, 1 << 20, ct); + } + } + + Directory.Delete(tempDir, recursive: true); + } + + private static async Task DownloadRangeWithRetryAsync( + HttpClient http, + string url, + (long start, long end) range, + string partPath, + CancellationToken ct, + Action? progress) + { + const int maxAttempts = 5; + var rng = new Random(); + + for (int attempt = 1; attempt <= maxAttempts; attempt++) + { + try + { + using var req = new HttpRequestMessage(HttpMethod.Get, url); + req.Headers.Range = new RangeHeaderValue(range.start, range.end); + + using var resp = await http.SendAsync(req, HttpCompletionOption.ResponseHeadersRead, ct); + resp.EnsureSuccessStatusCode(); + + using var src = await resp.Content.ReadAsStreamAsync(ct); + using var dst = new FileStream(partPath, FileMode.Create, FileAccess.Write, FileShare.None, 1 << 20, useAsync: true); + + var buffer = new byte[1 << 20]; + int read; + long written = 0; + while ((read = await src.ReadAsync(buffer.AsMemory(0, buffer.Length), ct)) > 0) + { + await dst.WriteAsync(buffer.AsMemory(0, read), ct); + written += read; + progress?.Invoke(read); + } + + return true; + } + catch when (attempt < maxAttempts) + { + // jittered exponential backoff + var delayMs = (int)Math.Min(30000, (Math.Pow(2, attempt) * 250) + rng.Next(0, 250)); + await Task.Delay(delayMs, ct); + } + } + + return false; + } + + private static List<(long start, long end)> BuildRanges(long total, int size) + { + var ranges = new List<(long, long)>(); + for (long start = 0; start < total; start += size) + { + long end = Math.Min(total - 1, start + size - 1); + ranges.Add((start, end)); + } + + return ranges; + } + } +} diff --git a/src/VirtualClient/VirtualClient.Main/profiles/PERF-ELASTICSEARCH-RALLY.json b/src/VirtualClient/VirtualClient.Main/profiles/PERF-ELASTICSEARCH-RALLY.json index 44e7ef5f7e..056ffae2fb 100644 --- a/src/VirtualClient/VirtualClient.Main/profiles/PERF-ELASTICSEARCH-RALLY.json +++ b/src/VirtualClient/VirtualClient.Main/profiles/PERF-ELASTICSEARCH-RALLY.json @@ -2,14 +2,17 @@ "Description": "Elasticsearch Rally Workload", "Metadata": { "RecommendedMinimumExecutionTime": "24:00:00", - "SupportedPlatforms": "linux-x64,linux-arm64", - "SupportedOperatingSystems": "Debian,Ubuntu" + "SupportedPlatforms": "win-x64,win-arm64,linux-x64,linux-arm64", + "SupportedOperatingSystems": "Debian,Ubuntu,Windows" }, "Parameters": { "DiskFilter": "osdisk:false&biggestsize", - "DistributionVersion": "8.0.0", + "ElasticsearchVersion": "9.2.3", + "RallyVersion": "2.12.0", "Port": "9200", - "RallyTestMode": false + "RallyTestMode": false, + "UseWgetForElasticsearhDownloadOnLinux": false, + "UseWebRequestForElasticsearchDownloadOnWindows": false }, "Actions": [ { @@ -17,9 +20,12 @@ "Parameters": { "Scenario": "SetupElasticsearchRallyCluster", "Port": "$.Parameters.Port", + "ElasticsearchVersion": "$.Parameters.ElasticsearchVersion", + "UseWgetForElasticsearhDownloadOnLinux": "$.Parameters.UseWgetForElasticsearhDownloadOnLinux", + "UseWebRequestForElasticsearchDownloadOnWindows": "$.Parameters.UseWebRequestForElasticsearchDownloadOnWindows", "DiskFilter": "$.Parameters.DiskFilter", - "Role": "Server", - "PackageName": "elasticsearchrally" + "PackageName": "elasticsearchrally", + "Role": "Server" } }, { @@ -27,7 +33,8 @@ "Parameters": { "Scenario": "ExecuteGeoNamesBenchmark", "TrackName": "geonames", - "DistributionVersion": "$.Parameters.DistributionVersion", + "ElasticsearchVersion": "$.Parameters.ElasticsearchVersion", + "RallyVersion": "$.Parameters.RallyVersion", "Port": "$.Parameters.Port", "DiskFilter": "$.Parameters.DiskFilter", "RallyTestMode": "$.Parameters.RallyTestMode", @@ -39,7 +46,8 @@ "Parameters": { "Scenario": "ExecutePMCBenchmark", "TrackName": "pmc", - "DistributionVersion": "$.Parameters.DistributionVersion", + "ElasticsearchVersion": "$.Parameters.ElasticsearchVersion", + "RallyVersion": "$.Parameters.RallyVersion", "Port": "$.Parameters.Port", "DiskFilter": "$.Parameters.DiskFilter", "RallyTestMode": "$.Parameters.RallyTestMode", @@ -51,7 +59,8 @@ "Parameters": { "Scenario": "ExecuteGeopointBenchmark", "TrackName": "geopoint", - "DistributionVersion": "$.Parameters.DistributionVersion", + "ElasticsearchVersion": "$.Parameters.ElasticsearchVersion", + "RallyVersion": "$.Parameters.RallyVersion", "Port": "$.Parameters.Port", "DiskFilter": "$.Parameters.DiskFilter", "RallyTestMode": "$.Parameters.RallyTestMode", @@ -63,7 +72,8 @@ "Parameters": { "Scenario": "ExecuteHTTPLogsBenchmark", "TrackName": "http_logs", - "DistributionVersion": "$.Parameters.DistributionVersion", + "ElasticsearchVersion": "$.Parameters.ElasticsearchVersion", + "RallyVersion": "$.Parameters.RallyVersion", "Port": "$.Parameters.Port", "DiskFilter": "$.Parameters.DiskFilter", "RallyTestMode": "$.Parameters.RallyTestMode", From 124e286082713b26f5c7b69973a57561b3bc70dd Mon Sep 17 00:00:00 2001 From: "Samuel Filho (WIPRO LIMITED)" Date: Mon, 12 Jan 2026 14:31:39 -0300 Subject: [PATCH 3/3] temp directory for linux unit test fix --- .../ElasticsearchRally/ParallelDownloadHandlerTests.cs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ParallelDownloadHandlerTests.cs b/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ParallelDownloadHandlerTests.cs index 8e7a724f64..e2754776e1 100644 --- a/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ParallelDownloadHandlerTests.cs +++ b/src/VirtualClient/VirtualClient.Actions.UnitTests/ElasticsearchRally/ParallelDownloadHandlerTests.cs @@ -8,6 +8,7 @@ namespace VirtualClient.Actions using System.IO; using System.Net; using System.Net.Http; + using System.Reflection; using System.Threading; using System.Threading.Tasks; using CRC.VirtualClient.Actions; @@ -30,7 +31,10 @@ public void SetupTest() this.Setup(PlatformID.Unix); this.testUrl = "https://example.com/testfile.zip"; - this.testDestinationPath = this.Combine(this.GetPackagePath(), "testfile.zip"); + + string tempPath = Path.GetTempPath(); + + this.testDestinationPath = this.Combine(tempPath, "testfile.zip"); this.mockHttpMessageHandler = new Mock();