From 7bd9b75a512cc05edbf57f76b1fbdb6ebb82764f Mon Sep 17 00:00:00 2001 From: Pasca Robert-Paul Date: Mon, 8 Jun 2026 08:52:58 +0000 Subject: [PATCH] DPL: add metric aggregation based on policies --- Framework/Core/CMakeLists.txt | 2 + .../include/Framework/AggregationPolicy.h | 82 ++++ .../Core/include/Framework/MetricAggregator.h | 81 ++++ Framework/Core/src/AggregationPolicy.cxx | 149 +++++++ Framework/Core/src/MetricAggregator.cxx | 397 ++++++++++++++++++ Framework/Core/src/runDataProcessing.cxx | 21 + 6 files changed, 732 insertions(+) create mode 100644 Framework/Core/include/Framework/AggregationPolicy.h create mode 100644 Framework/Core/include/Framework/MetricAggregator.h create mode 100644 Framework/Core/src/AggregationPolicy.cxx create mode 100644 Framework/Core/src/MetricAggregator.cxx diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index 45af3ad6c59cc..13c1509e2e2f2 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -119,6 +119,8 @@ o2_add_library(Framework src/PluginManager.cxx src/RateLimiter.cxx src/ResourcesMonitoringHelper.cxx + src/AggregationPolicy.cxx + src/MetricAggregator.cxx src/ResourcePolicy.cxx src/ResourcePolicyHelpers.cxx src/RootArrowFilesystem.cxx diff --git a/Framework/Core/include/Framework/AggregationPolicy.h b/Framework/Core/include/Framework/AggregationPolicy.h new file mode 100644 index 0000000000000..a8e4799716cf7 --- /dev/null +++ b/Framework/Core/include/Framework/AggregationPolicy.h @@ -0,0 +1,82 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef O2_FRAMEWORK_METRICAGGREGATOR_AGGREGATIONPOLICY_H +#define O2_FRAMEWORK_METRICAGGREGATOR_AGGREGATIONPOLICY_H + +#include +#include +#include +#include + +namespace o2 +{ + +namespace framework +{ + +namespace metricaggregator +{ +/// Defines the selection strategy for devices. +enum class AggregationSelectionType { + All, + Specific +}; +/// Defines the reduction strategy for metrics. +enum class AggregationMetricType { + Sum, + Average, + Rate, + Specific, + Simple +}; + +/// Parses environment configurations and evaluates aggregation rules. +class AggregationPolicy +{ + public: + AggregationPolicy() = default; + ~AggregationPolicy() = default; + /// Reads configuration from environment variables and sets internal rules. + void configureFromEnv(); + /// Returns the configured device selection type. + AggregationSelectionType getSelection() const noexcept; + /// Returns the configured global metric reduction type. + AggregationMetricType getReduction() const noexcept; + /// Determines the specific reduction type required for a given metric name. + AggregationMetricType getAggregationTypeForMetric(std::string_view metricName) const; + /// Evaluates whether the policy allows processing for the provided device name. + bool selectDevice(std::string_view deviceId) const; + + private: + /// Maps a regular expression pattern to a specific aggregation type. + struct MetricRule { + std::regex metricPattern; + AggregationMetricType type; + }; + + std::vector split(std::string_view input, char delim) const; + /// Converts a string literal into an AggregationSelectionType enum. + AggregationSelectionType parseSelectionType(const std::string& str); + /// Converts a string literal into an AggregationMetricType enum. + AggregationMetricType parseReductionType(const std::string& str); + + AggregationSelectionType mSelection = AggregationSelectionType::All; + AggregationMetricType mReduction = AggregationMetricType::Sum; + std::vector mSpecificDevices; + std::vector mSpecificMetricRules; +}; + +} // namespace metricaggregator +} // namespace framework +} // namespace o2 + +#endif // O2_FRAMEWORK_METRICAGGREGATOR_AGGREGATIONPOLICY_H diff --git a/Framework/Core/include/Framework/MetricAggregator.h b/Framework/Core/include/Framework/MetricAggregator.h new file mode 100644 index 0000000000000..08dafdeea2fa9 --- /dev/null +++ b/Framework/Core/include/Framework/MetricAggregator.h @@ -0,0 +1,81 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef O2_FRAMEWORK_METRICAGGREGATOR_METRICAGGREGATOR_H +#define O2_FRAMEWORK_METRICAGGREGATOR_METRICAGGREGATOR_H + +#include "Framework/ServiceHandle.h" +#include "Framework/ServiceMetricsInfo.h" +#include "Framework/Monitoring.h" + +#include +#include +#include +#include +#include + +#include "Framework/AggregationPolicy.h" + +namespace o2 +{ + +namespace framework +{ + +namespace metricaggregator +{ +/// Stores a single numeric measurement and its associated timestamp. +struct MetricSample { + double value = 0.0; + std::size_t timestamp = 0; +}; + +/// Collects and reduces metrics across multiple framework devices. +/// Transmits the aggregated results to an external monitoring backend. +class MetricAggregator +{ + public: + explicit MetricAggregator(); + ~MetricAggregator() = default; + /// Initializes the internal aggregation policy from environment variables. + void setPolicy(); + /// Returns the current policy configuration as a formatted string. + std::string getPolicy(); + /// Routes metrics to the appropriate processing function based on the policy reduction type. + void mergeMetrics(const std::vector& metrics, + const DeviceMetricsInfo& driverMetrics, + const std::vector& specs); + + private: + /// Appends a suffix to the metric name based on the applied aggregation type. + std::string getMetricNameFromPolicy(std::string_view metricName, AggregationMetricType aggregationType); + /// Flushes metrics directly without aggregation. + void flushMetricsSimple(const std::vector& deviceMetrics, + const DeviceMetricsInfo& driverMetrics, + const std::vector& specs); + /// Flushes metrics by applying the aggregation policy. + void flushMetrics(const std::vector& deviceMetrics, + const DeviceMetricsInfo& driverMetrics, + const std::vector& specs); + /// Retrieves the monitoring backend type from environment variables. + const char* getBackendFromEnv(); + + const char* mBackend = nullptr; + std::unique_ptr mMonitoring; + /// Stores the previous samples required to compute rates over time. + std::unordered_map> mLastSentSamples; + std::unique_ptr mPolicy; +}; +} // namespace metricaggregator +} // namespace framework +} // namespace o2 + +#endif // O2_FRAMEWORK_METRICAGGREGATOR_METRICAGGREGATOR_H diff --git a/Framework/Core/src/AggregationPolicy.cxx b/Framework/Core/src/AggregationPolicy.cxx new file mode 100644 index 0000000000000..2bae174858b0a --- /dev/null +++ b/Framework/Core/src/AggregationPolicy.cxx @@ -0,0 +1,149 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include "Framework/AggregationPolicy.h" +#include "Framework/Logger.h" + +#include +#include +#include +#include +#include +#include +#include + +using namespace o2::framework::metricaggregator; + +std::vector AggregationPolicy::split(std::string_view input, char delim) const +{ + std::vector tokens; + std::string token; + std::istringstream tokenStream{std::string(input)}; + while (std::getline(tokenStream, token, delim)) { + tokens.push_back(token); + } + return tokens; +} + +void AggregationPolicy::configureFromEnv() +{ + const char* envPolicy = std::getenv("ALIEN_JDL_AGGREGATOR_POLICY"); + if (!envPolicy) { + LOGP(warn, "[AggregationPolicy] ALIEN_JDL_AGGREGATOR_POLICY is not set. Using default 'all:simple'."); + mSelection = AggregationSelectionType::All; + mReduction = AggregationMetricType::Simple; + return; + } + + try { + std::string policyStr(envPolicy); + std::vector parts = split(policyStr, ':'); + + if (parts.size() < 2) { + LOGP(error, "[AggregationPolicy] Invalid ALIEN_JDL_AGGREGATOR_POLICY format"); + return; + } + + mSelection = parseSelectionType(parts[0]); + mReduction = parseReductionType(parts[1]); + + if (mSelection == AggregationSelectionType::Specific) { + const char* envDevices = std::getenv("ALIEN_JDL_AGGREGATOR_DEVICES"); + if (!envDevices) { + throw std::invalid_argument("ALIEN_JDL_AGGREGATOR_DEVICES environment variable is required when selection type is 'specific'"); + } + mSpecificDevices = split(std::string(envDevices), ','); + } + if (mReduction == AggregationMetricType::Specific) { + const char* envMetrics = std::getenv("ALIEN_JDL_AGGREGATOR_METRICS"); + if (!envMetrics) { + LOGP(warn, "[AggregationPolicy] ALIEN_JDL_AGGREGATOR_METRICS environment variable missing for 'specific' reduction type. Using default."); + mSpecificMetricRules.push_back({std::regex(".*"), AggregationMetricType::Sum}); + return; + } + + std::stringstream metricsStream(envMetrics); + std::string metricRuleStr; + while (std::getline(metricsStream, metricRuleStr, ';')) { + auto pos = metricRuleStr.find(':'); + if (pos == std::string::npos) { + throw std::invalid_argument("Invalid metric rule format: " + metricRuleStr); + } + std::string typeStr = metricRuleStr.substr(0, pos); + std::string patternStr = metricRuleStr.substr(pos + 1); + AggregationMetricType type = parseReductionType(typeStr); + mSpecificMetricRules.push_back({std::regex(patternStr), type}); + } + } + } catch (std::exception const& e) { + LOGP(error, "[AggregationPolicy] Failed to parse ALIEN_JDL_AGGREGATOR_POLICY: {}", e.what()); + } +} + +AggregationMetricType AggregationPolicy::getAggregationTypeForMetric(std::string_view metricName) const +{ + if (mReduction != AggregationMetricType::Specific) { + return mReduction; + } + for (const auto& rule : mSpecificMetricRules) { + if (std::regex_match(std::string(metricName), rule.metricPattern)) { + return rule.type; + } + } + if (mReduction == AggregationMetricType::Specific) { + LOGP(error, "[AggregationPolicy] No specific aggregation type found for metric '{}'", metricName); + } + throw std::invalid_argument("No specific aggregation type found for metric: " + std::string(metricName)); +} + +AggregationSelectionType AggregationPolicy::getSelection() const noexcept +{ + return mSelection; +} + +AggregationMetricType AggregationPolicy::getReduction() const noexcept +{ + return mReduction; +} + +AggregationSelectionType AggregationPolicy::parseSelectionType(const std::string& str) +{ + if (str == "all") { + return AggregationSelectionType::All; + } else if (str == "specific") { + return AggregationSelectionType::Specific; + } + throw std::invalid_argument("Invalid selection type: " + str); +} + +AggregationMetricType AggregationPolicy::parseReductionType(const std::string& str) +{ + if (str == "sum") { + return AggregationMetricType::Sum; + } else if (str == "average") { + return AggregationMetricType::Average; + } else if (str == "rate") { + return AggregationMetricType::Rate; + } else if (str == "simple") { + return AggregationMetricType::Simple; + } else if (str == "specific") { + return AggregationMetricType::Specific; + } + throw std::invalid_argument("Invalid reduction type: " + str); +} + +bool AggregationPolicy::selectDevice(std::string_view deviceId) const +{ + if (mSelection == AggregationSelectionType::Specific) { + return std::find(mSpecificDevices.begin(), mSpecificDevices.end(), deviceId) != mSpecificDevices.end(); + } + return true; +} diff --git a/Framework/Core/src/MetricAggregator.cxx b/Framework/Core/src/MetricAggregator.cxx new file mode 100644 index 0000000000000..a1c666c8b05cc --- /dev/null +++ b/Framework/Core/src/MetricAggregator.cxx @@ -0,0 +1,397 @@ +// Copyright 2019-2025 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include "Framework/MetricAggregator.h" + +#include "Framework/CommonServices.h" +#include "Framework/DeviceMetricsInfo.h" +#include "Framework/Logger.h" +#include "Framework/Monitoring.h" +#include "Framework/Plugins.h" +#include "Framework/ServiceSpec.h" +#include "Framework/TypeIdHelpers.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace o2::framework; +using Metric = o2::monitoring::Metric; +#define MONITORING_QUEUE_SIZE 100 + +namespace o2::framework::metricaggregator +{ +/// Reads the most recent value from the ring buffer based on the write position and the history offset. +template +bool getNumericValue(std::size_t storeIdx, + std::size_t writePos, + std::size_t filledMetrics, + std::size_t historyOffset, + std::vector> const& valuesStorage, + std::vector> const& timestampsStorage, + MetricSample& out) +{ + if (storeIdx >= valuesStorage.size() || storeIdx >= timestampsStorage.size()) { + return false; + } + + auto const& values = valuesStorage[storeIdx]; + auto const& timestamps = timestampsStorage[storeIdx]; + auto const capacity = values.size(); + if (capacity == 0 || capacity <= historyOffset) { + return false; + } + + auto const ringIndex = (writePos + capacity - 1 - historyOffset) % capacity; + out.value = static_cast(values[ringIndex]); + out.timestamp = timestamps[ringIndex]; + return true; +} + +/// Routes the metric extraction request to the correct typed storage array. +bool extractNumericSample(DeviceMetricsInfo const& info, + std::size_t metricIndex, + std::size_t historyOffset, + MetricSample& out) +{ + if (metricIndex >= info.metrics.size()) { + return false; + } + + auto const& metric = info.metrics[metricIndex]; + switch (metric.type) { + case MetricType::Int: { + return getNumericValue(metric.storeIdx, + metric.pos, + metric.filledMetrics, + historyOffset, + info.intMetrics, + info.intTimestamps, + out); + } + case MetricType::Uint64: { + return getNumericValue(metric.storeIdx, + metric.pos, + metric.filledMetrics, + historyOffset, + info.uint64Metrics, + info.uint64Timestamps, + out); + } + case MetricType::Float: { + return getNumericValue(metric.storeIdx, + metric.pos, + metric.filledMetrics, + historyOffset, + info.floatMetrics, + info.floatTimestamps, + out); + } + case MetricType::Enum: { + return getNumericValue(metric.storeIdx, + metric.pos, + metric.filledMetrics, + historyOffset, + info.enumMetrics, + info.enumTimestamps, + out); + } + case MetricType::String: + case MetricType::Unknown: + return false; + } + return false; +} + +bool findMetricIndexByName(DeviceMetricsInfo const& info, + std::string_view metricName, + std::size_t& metricIndex) +{ + auto const nMetrics = std::min(info.metrics.size(), info.metricLabels.size()); + for (std::size_t i = 0; i < nMetrics; ++i) { + auto const& label = info.metricLabels[i]; + std::string_view currentName{label.label, label.size}; + if (currentName == metricName) { + metricIndex = i; + return true; + } + } + return false; +} + +MetricAggregator::MetricAggregator() +{ + mBackend = getBackendFromEnv(); + if (mBackend == nullptr) { + LOGP(error, "[MetricAggregator] No backend configured, skipping initialization"); + return; + } + + mMonitoring = o2::monitoring::MonitoringFactory::Get(mBackend); + if (mMonitoring == nullptr) { + LOGP(error, "[MetricAggregator] Failed to create monitoring backend for '{}'", mBackend); + return; + } + mMonitoring->enableBuffering(MONITORING_QUEUE_SIZE); + setPolicy(); + LOGP(info, "[MetricAggregator] Initialized with policy '{}'", getPolicy()); +} + +void MetricAggregator::setPolicy() +{ + mPolicy = std::make_unique(); + mPolicy->configureFromEnv(); +} + +std::string MetricAggregator::getPolicy() +{ + std::stringstream ss; + std::string selectionStr; + switch (mPolicy->getSelection()) { + case AggregationSelectionType::All: + selectionStr = "All"; + break; + case AggregationSelectionType::Specific: + selectionStr = "Specific"; + break; + } + std::string reductionStr; + switch (mPolicy->getReduction()) { + case AggregationMetricType::Sum: + reductionStr = "Sum"; + break; + case AggregationMetricType::Average: + reductionStr = "Average"; + break; + case AggregationMetricType::Rate: + reductionStr = "Rate"; + break; + case AggregationMetricType::Simple: + reductionStr = "Simple"; + break; + case AggregationMetricType::Specific: + reductionStr = "Specific"; + break; + } + ss << selectionStr << ":" << reductionStr; + return ss.str(); +} + +void MetricAggregator::mergeMetrics(const std::vector& metrics, + const DeviceMetricsInfo& driverMetrics, + const std::vector& specs) +{ + if (mPolicy->getReduction() == AggregationMetricType::Simple) { + flushMetricsSimple(metrics, driverMetrics, specs); + } else { + flushMetrics(metrics, driverMetrics, specs); + } +} + +std::string MetricAggregator::getMetricNameFromPolicy(std::string_view metricName, AggregationMetricType aggregationType) +{ + std::string metricNameStr{metricName}; + if (aggregationType == AggregationMetricType::Sum) { + metricNameStr += "_sum"; + } else if (aggregationType == AggregationMetricType::Rate) { + metricNameStr += "_R"; + } else if (aggregationType == AggregationMetricType::Average) { + metricNameStr += "_avg"; + } + return metricNameStr; +} + +void MetricAggregator::flushMetricsSimple(const std::vector& deviceMetricsInfo, + const DeviceMetricsInfo& driverMetrics, + const std::vector& specs) +{ + auto const nDevices = std::min(deviceMetricsInfo.size(), specs.size()); + if (nDevices == 0) { + return; + } + + for (std::size_t di = 0; di < nDevices; ++di) { + auto const& deviceId = specs[di].id; + + if (mPolicy && !mPolicy->selectDevice(deviceId)) { + continue; + } + + auto const& deviceMetrics = deviceMetricsInfo[di]; + auto const nMetrics = std::min({deviceMetrics.metrics.size(), + deviceMetrics.metricLabels.size(), + deviceMetrics.changed.size()}); + + auto monitoring = o2::monitoring::MonitoringFactory::Get(mBackend); + if (monitoring == nullptr) { + LOGP(error, "[MetricAggregator] Failed to create monitoring backend for '{}'", mBackend); + return; + } + monitoring->enableBuffering(MONITORING_QUEUE_SIZE); + monitoring->addGlobalTag("pipeline_id", std::to_string(specs[di].inputTimesliceId)); + monitoring->addGlobalTag("dataprocessor_name", specs[di].name); + // FIXME: dpl_instance missing + + for (std::size_t mi = 0; mi < nMetrics; ++mi) { + MetricSample sample; + if (!extractNumericSample(deviceMetrics, mi, 0, sample)) { + continue; + } + + auto const metricName = std::string(deviceMetrics.metricLabels[mi].label, deviceMetrics.metricLabels[mi].size); + auto tp = std::chrono::time_point(std::chrono::milliseconds(sample.timestamp)); + auto metric = o2::monitoring::Metric{metricName, o2::monitoring::Metric::DefaultVerbosity, tp}; + metric.addValue(sample.value, "value"); + monitoring->send(std::move(metric)); + } + monitoring->flushBuffer(); + } +} + +void MetricAggregator::flushMetrics(const std::vector& deviceMetrics, + const DeviceMetricsInfo& driverMetrics, + const std::vector& specs) +{ + auto const nDevices = std::min(deviceMetrics.size(), specs.size()); + if (nDevices == 0) { + mMonitoring->flushBuffer(); + return; + } + + // Collect all unique metric names across devices to determine which metrics to aggregate. + std::unordered_set allMetricNames; + for (const auto& deviceMetricsInfo : deviceMetrics) { + auto const nMetrics = std::min(deviceMetricsInfo.metrics.size(), deviceMetricsInfo.metricLabels.size()); + for (std::size_t i = 0; i < nMetrics; ++i) { + allMetricNames.insert(std::string_view(deviceMetricsInfo.metricLabels[i].label, deviceMetricsInfo.metricLabels[i].size)); + } + } + + for (const auto& metricName : allMetricNames) { + try { + auto const metricAggregationType = mPolicy->getAggregationTypeForMetric(metricName); + if (metricAggregationType == AggregationMetricType::Simple) { + continue; + } + + // Gather the latest metric values from each valid device + std::vector deviceSamples; + deviceSamples.reserve(nDevices); + auto metricTimestamp = std::numeric_limits::max(); + for (std::size_t di = 0; di < nDevices; ++di) { + auto const& deviceId = specs[di].id; + if (mPolicy && !mPolicy->selectDevice(deviceId)) { + continue; + } + auto const& deviceMetricsInfo = deviceMetrics[di]; + std::size_t deviceMetricIndex = 0; + if (!findMetricIndexByName(deviceMetricsInfo, metricName, deviceMetricIndex)) { + continue; + } + + MetricSample latest; + if (!extractNumericSample(deviceMetricsInfo, deviceMetricIndex, 0, latest) || latest.timestamp == 0) { + continue; + } + deviceSamples.push_back(latest); + metricTimestamp = std::min(metricTimestamp, latest.timestamp); + } + + if (deviceSamples.empty()) { + continue; + } + + const auto handlers = std::unordered_map&)>>{ + {AggregationMetricType::Sum, [](const auto& windows) { + double sum = 0.0; + for (const auto& window : windows) { + sum += window.value; + } + return sum; + }}, + {AggregationMetricType::Average, [](const auto& windows) { + double sum = 0.0; + std::size_t count = 0; + for (const auto& window : windows) { + sum += window.value; + ++count; + } + return windows.empty() ? 0.0 : sum / count; + }}, + {AggregationMetricType::Rate, [this, &metricName](const auto& windows) { + double sumRate = 0.0; + auto const& previousSamples = mLastSentSamples[std::string{metricName}]; + if (mLastSentSamples.empty() || previousSamples.size() != windows.size()) { + return 0.0; + } + for (const auto& window : windows) { + double deltaValue = window.value - previousSamples[&window - &windows[0]].value; + double deltaTimeSec = (window.timestamp - previousSamples[&window - &windows[0]].timestamp) / 1000.0; + if (deltaTimeSec <= 0) { + continue; + } + sumRate += deltaValue / deltaTimeSec; + } + return sumRate; + }} + }; + + double reducedValue = 0.0; + auto handlerIt = handlers.find(metricAggregationType); + if (handlerIt == handlers.end()) { + LOGP(error, "[MetricAggregator] No handler found for aggregation type '{}'", static_cast(metricAggregationType)); + continue; + } + reducedValue = handlerIt->second(deviceSamples); + if (reducedValue < 0) { + continue; + } + + auto tp = std::chrono::time_point(std::chrono::milliseconds(metricTimestamp)); + std::string metricNameWithPolicy = getMetricNameFromPolicy(metricName, metricAggregationType); + auto metric = Metric{metricNameWithPolicy, Metric::DefaultVerbosity, tp}; + metric.addValue(reducedValue, "value"); + mMonitoring->send(std::move(metric)); + mLastSentSamples[std::string{metricName}] = deviceSamples; + } catch (const std::exception& e) { + LOGP(error, "[MetricAggregator] Error determining aggregation type: {}",e.what()); + continue; + } + } + mMonitoring->flushBuffer(); +} + +const char* MetricAggregator::getBackendFromEnv() +{ + const char* envBackend = std::getenv("APMON_CONFIG"); + if (envBackend == nullptr) { + LOGP(error, "[MetricAggregator] APMON_CONFIG environment variable is not set"); + return nullptr; + } + + static std::string result = std::string("apmon://") + envBackend; + return result.c_str(); +} + +} // namespace o2::framework::metricaggregator diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index c58f8e7287b3b..754fffb68408a 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -89,6 +89,7 @@ #include #include #include "ResourcesMonitoringHelper.h" +#include "Framework/MetricAggregator.h" #include #include @@ -157,6 +158,7 @@ CPU = 0; \ } #endif +#define O2_METRICAGGREGATOR_INTERVAL 30 using namespace o2::monitoring; using namespace o2::configuration; @@ -1265,6 +1267,13 @@ void dumpMetricsCallback(uv_timer_t* handle) file); } +void aggregateMetricsCallback(uv_timer_t* handle) +{ + auto* context = (DriverServerContext*)handle->data; + static o2::framework::metricaggregator::MetricAggregator aggregator; + aggregator.mergeMetrics(*(context->metrics), context->driver->metrics, *(context->specs)); +} + void dumpRunSummary(DriverServerContext& context, DriverInfo const& driverInfo, DeviceInfos const& infos, DeviceSpecs const& specs) { if (infos.empty()) { @@ -1532,6 +1541,8 @@ int runStateMachine(DataProcessorSpecs const& workflow, uv_timer_t metricDumpTimer; metricDumpTimer.data = &serverContext; + uv_timer_t aggregationMetricTimer; + aggregationMetricTimer.data = &serverContext; bool allChildrenGone = false; guiContext.allChildrenGone = &allChildrenGone; O2_SIGNPOST_ID_FROM_POINTER(sid, driver, loop); @@ -2148,6 +2159,12 @@ int runStateMachine(DataProcessorSpecs const& workflow, driverInfo.resourcesMonitoringDumpInterval * 1000, driverInfo.resourcesMonitoringDumpInterval * 1000); } + + if (std::getenv("ALIEN_JDL_AGGREGATOR_POLICY") != nullptr) { + uv_timer_init(loop, &aggregationMetricTimer); + uv_timer_start(&aggregationMetricTimer, aggregateMetricsCallback, O2_METRICAGGREGATOR_INTERVAL * 1000, O2_METRICAGGREGATOR_INTERVAL * 1000); + } + /// Set the value for the severity of displayed logs to the command line value --severity for (const auto& processorInfo : dataProcessorInfos) { const auto& cmdLineArgs = processorInfo.cmdLineArgs; @@ -2292,8 +2309,12 @@ int runStateMachine(DataProcessorSpecs const& workflow, if (driverInfo.resourcesMonitoringDumpInterval) { uv_timer_stop(&metricDumpTimer); } + if (std::getenv("ALIEN_JDL_AGGREGATOR_POLICY") != nullptr) { + uv_timer_stop(&aggregationMetricTimer); + } LOGP(info, "Dumping performance metrics to {}.json file", driverInfo.resourcesMonitoringFilename); dumpMetricsCallback(&metricDumpTimer); + aggregateMetricsCallback(&aggregationMetricTimer); } dumpRunSummary(serverContext, driverInfo, infos, runningWorkflow.devices); // This is a clean exit. Before we do so, if required,