Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Framework/Core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ o2_add_library(Framework
src/PluginManager.cxx
src/RateLimiter.cxx
src/ResourcesMonitoringHelper.cxx
src/AggregationPolicy.cxx
src/MetricAggregator.cxx
src/ResourcePolicy.cxx
src/ResourcePolicyHelpers.cxx
src/RootArrowFilesystem.cxx
Expand Down
82 changes: 82 additions & 0 deletions Framework/Core/include/Framework/AggregationPolicy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#ifndef O2_FRAMEWORK_METRICAGGREGATOR_AGGREGATIONPOLICY_H
#define O2_FRAMEWORK_METRICAGGREGATOR_AGGREGATIONPOLICY_H

#include <string>
#include <string_view>
#include <vector>
#include <regex>

namespace o2
{

namespace framework
{

namespace metricaggregator
{
/// Defines the selection strategy for devices.
enum class AggregationSelectionType {
All,
Specific
};
/// Defines the reduction strategy for metrics.
enum class AggregationMetricType {
Sum,
Average,
Rate,
Specific,
Simple
};

/// Parses environment configurations and evaluates aggregation rules.
class AggregationPolicy
{
public:
AggregationPolicy() = default;
~AggregationPolicy() = default;
/// Reads configuration from environment variables and sets internal rules.
void configureFromEnv();
/// Returns the configured device selection type.
AggregationSelectionType getSelection() const noexcept;
/// Returns the configured global metric reduction type.
AggregationMetricType getReduction() const noexcept;
/// Determines the specific reduction type required for a given metric name.
AggregationMetricType getAggregationTypeForMetric(std::string_view metricName) const;
/// Evaluates whether the policy allows processing for the provided device name.
bool selectDevice(std::string_view deviceId) const;

private:
/// Maps a regular expression pattern to a specific aggregation type.
struct MetricRule {
std::regex metricPattern;
AggregationMetricType type;
};

std::vector<std::string> split(std::string_view input, char delim) const;
/// Converts a string literal into an AggregationSelectionType enum.
AggregationSelectionType parseSelectionType(const std::string& str);
/// Converts a string literal into an AggregationMetricType enum.
AggregationMetricType parseReductionType(const std::string& str);

AggregationSelectionType mSelection = AggregationSelectionType::All;
AggregationMetricType mReduction = AggregationMetricType::Sum;
std::vector<std::string> mSpecificDevices;
std::vector<MetricRule> mSpecificMetricRules;
};

} // namespace metricaggregator
} // namespace framework
} // namespace o2

#endif // O2_FRAMEWORK_METRICAGGREGATOR_AGGREGATIONPOLICY_H
81 changes: 81 additions & 0 deletions Framework/Core/include/Framework/MetricAggregator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#ifndef O2_FRAMEWORK_METRICAGGREGATOR_METRICAGGREGATOR_H
#define O2_FRAMEWORK_METRICAGGREGATOR_METRICAGGREGATOR_H

#include "Framework/ServiceHandle.h"
#include "Framework/ServiceMetricsInfo.h"
#include "Framework/Monitoring.h"

#include <fairmq/ProgOptions.h>
#include <memory>
#include <string>
#include <vector>
#include <unordered_map>

#include "Framework/AggregationPolicy.h"

namespace o2
{

namespace framework
{

namespace metricaggregator
{
/// Stores a single numeric measurement and its associated timestamp.
struct MetricSample {
double value = 0.0;
std::size_t timestamp = 0;
};

/// Collects and reduces metrics across multiple framework devices.
/// Transmits the aggregated results to an external monitoring backend.
class MetricAggregator
{
public:
explicit MetricAggregator();
~MetricAggregator() = default;
/// Initializes the internal aggregation policy from environment variables.
void setPolicy();
/// Returns the current policy configuration as a formatted string.
std::string getPolicy();
/// Routes metrics to the appropriate processing function based on the policy reduction type.
void mergeMetrics(const std::vector<DeviceMetricsInfo>& metrics,
const DeviceMetricsInfo& driverMetrics,
const std::vector<DeviceSpec>& specs);

private:
/// Appends a suffix to the metric name based on the applied aggregation type.
std::string getMetricNameFromPolicy(std::string_view metricName, AggregationMetricType aggregationType);
/// Flushes metrics directly without aggregation.
void flushMetricsSimple(const std::vector<DeviceMetricsInfo>& deviceMetrics,
const DeviceMetricsInfo& driverMetrics,
const std::vector<DeviceSpec>& specs);
/// Flushes metrics by applying the aggregation policy.
void flushMetrics(const std::vector<DeviceMetricsInfo>& deviceMetrics,
const DeviceMetricsInfo& driverMetrics,
const std::vector<DeviceSpec>& specs);
/// Retrieves the monitoring backend type from environment variables.
const char* getBackendFromEnv();

const char* mBackend = nullptr;
std::unique_ptr<o2::monitoring::Monitoring> mMonitoring;
/// Stores the previous samples required to compute rates over time.
std::unordered_map<std::string, std::vector<MetricSample>> mLastSentSamples;
std::unique_ptr<AggregationPolicy> mPolicy;
};
} // namespace metricaggregator
} // namespace framework
} // namespace o2

#endif // O2_FRAMEWORK_METRICAGGREGATOR_METRICAGGREGATOR_H
149 changes: 149 additions & 0 deletions Framework/Core/src/AggregationPolicy.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#include "Framework/AggregationPolicy.h"
#include "Framework/Logger.h"

#include <algorithm>
#include <cstdlib>
#include <sstream>
#include <string>
#include <vector>
#include <regex>
#include <stdexcept>

using namespace o2::framework::metricaggregator;

std::vector<std::string> AggregationPolicy::split(std::string_view input, char delim) const
{
std::vector<std::string> tokens;
std::string token;
std::istringstream tokenStream{std::string(input)};
while (std::getline(tokenStream, token, delim)) {
tokens.push_back(token);
}
return tokens;
}

void AggregationPolicy::configureFromEnv()
{
const char* envPolicy = std::getenv("ALIEN_JDL_AGGREGATOR_POLICY");
if (!envPolicy) {
LOGP(warn, "[AggregationPolicy] ALIEN_JDL_AGGREGATOR_POLICY is not set. Using default 'all:simple'.");
mSelection = AggregationSelectionType::All;
mReduction = AggregationMetricType::Simple;
return;
}

try {
std::string policyStr(envPolicy);
std::vector<std::string> parts = split(policyStr, ':');

if (parts.size() < 2) {
LOGP(error, "[AggregationPolicy] Invalid ALIEN_JDL_AGGREGATOR_POLICY format");
return;
}

mSelection = parseSelectionType(parts[0]);
mReduction = parseReductionType(parts[1]);

if (mSelection == AggregationSelectionType::Specific) {
const char* envDevices = std::getenv("ALIEN_JDL_AGGREGATOR_DEVICES");
if (!envDevices) {
throw std::invalid_argument("ALIEN_JDL_AGGREGATOR_DEVICES environment variable is required when selection type is 'specific'");
}
mSpecificDevices = split(std::string(envDevices), ',');
}
if (mReduction == AggregationMetricType::Specific) {
const char* envMetrics = std::getenv("ALIEN_JDL_AGGREGATOR_METRICS");
if (!envMetrics) {
LOGP(warn, "[AggregationPolicy] ALIEN_JDL_AGGREGATOR_METRICS environment variable missing for 'specific' reduction type. Using default.");
mSpecificMetricRules.push_back({std::regex(".*"), AggregationMetricType::Sum});
return;
}

std::stringstream metricsStream(envMetrics);
std::string metricRuleStr;
while (std::getline(metricsStream, metricRuleStr, ';')) {
auto pos = metricRuleStr.find(':');
if (pos == std::string::npos) {
throw std::invalid_argument("Invalid metric rule format: " + metricRuleStr);
}
std::string typeStr = metricRuleStr.substr(0, pos);
std::string patternStr = metricRuleStr.substr(pos + 1);
AggregationMetricType type = parseReductionType(typeStr);
mSpecificMetricRules.push_back({std::regex(patternStr), type});
}
}
} catch (std::exception const& e) {
LOGP(error, "[AggregationPolicy] Failed to parse ALIEN_JDL_AGGREGATOR_POLICY: {}", e.what());
}
}

AggregationMetricType AggregationPolicy::getAggregationTypeForMetric(std::string_view metricName) const
{
if (mReduction != AggregationMetricType::Specific) {
return mReduction;
}
for (const auto& rule : mSpecificMetricRules) {
if (std::regex_match(std::string(metricName), rule.metricPattern)) {
return rule.type;
}
}
if (mReduction == AggregationMetricType::Specific) {
LOGP(error, "[AggregationPolicy] No specific aggregation type found for metric '{}'", metricName);
}
throw std::invalid_argument("No specific aggregation type found for metric: " + std::string(metricName));
}

AggregationSelectionType AggregationPolicy::getSelection() const noexcept
{
return mSelection;
}

AggregationMetricType AggregationPolicy::getReduction() const noexcept
{
return mReduction;
}

AggregationSelectionType AggregationPolicy::parseSelectionType(const std::string& str)
{
if (str == "all") {
return AggregationSelectionType::All;
} else if (str == "specific") {
return AggregationSelectionType::Specific;
}
throw std::invalid_argument("Invalid selection type: " + str);
}

AggregationMetricType AggregationPolicy::parseReductionType(const std::string& str)
{
if (str == "sum") {
return AggregationMetricType::Sum;
} else if (str == "average") {
return AggregationMetricType::Average;
} else if (str == "rate") {
return AggregationMetricType::Rate;
} else if (str == "simple") {
return AggregationMetricType::Simple;
} else if (str == "specific") {
return AggregationMetricType::Specific;
}
throw std::invalid_argument("Invalid reduction type: " + str);
}

bool AggregationPolicy::selectDevice(std::string_view deviceId) const
{
if (mSelection == AggregationSelectionType::Specific) {
return std::find(mSpecificDevices.begin(), mSpecificDevices.end(), deviceId) != mSpecificDevices.end();
}
return true;
}
Loading
Loading