Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 70 additions & 71 deletions Framework/Core/src/MetricAggregator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ namespace o2::framework::metricaggregator
/// Reads the most recent value from the ring buffer based on the write position and the history offset.
template <typename ValueT>
bool getNumericValue(std::size_t storeIdx,
std::size_t writePos,
std::size_t filledMetrics,
std::size_t historyOffset,
std::vector<MetricsStorage<ValueT>> const& valuesStorage,
std::vector<TimestampsStorage<ValueT>> const& timestampsStorage,
MetricSample& out)
std::size_t writePos,
std::size_t filledMetrics,
std::size_t historyOffset,
std::vector<MetricsStorage<ValueT>> const& valuesStorage,
std::vector<TimestampsStorage<ValueT>> const& timestampsStorage,
MetricSample& out)
{
if (storeIdx >= valuesStorage.size() || storeIdx >= timestampsStorage.size()) {
return false;
Expand All @@ -70,9 +70,9 @@ bool getNumericValue(std::size_t storeIdx,

/// Routes the metric extraction request to the correct typed storage array.
bool extractNumericSample(DeviceMetricsInfo const& info,
std::size_t metricIndex,
std::size_t historyOffset,
MetricSample& out)
std::size_t metricIndex,
std::size_t historyOffset,
MetricSample& out)
{
if (metricIndex >= info.metrics.size()) {
return false;
Expand All @@ -82,39 +82,39 @@ bool extractNumericSample(DeviceMetricsInfo const& info,
switch (metric.type) {
case MetricType::Int: {
return getNumericValue(metric.storeIdx,
metric.pos,
metric.filledMetrics,
historyOffset,
info.intMetrics,
info.intTimestamps,
out);
metric.pos,
metric.filledMetrics,
historyOffset,
info.intMetrics,
info.intTimestamps,
out);
}
case MetricType::Uint64: {
return getNumericValue(metric.storeIdx,
metric.pos,
metric.filledMetrics,
historyOffset,
info.uint64Metrics,
info.uint64Timestamps,
out);
metric.pos,
metric.filledMetrics,
historyOffset,
info.uint64Metrics,
info.uint64Timestamps,
out);
}
case MetricType::Float: {
return getNumericValue(metric.storeIdx,
metric.pos,
metric.filledMetrics,
historyOffset,
info.floatMetrics,
info.floatTimestamps,
out);
metric.pos,
metric.filledMetrics,
historyOffset,
info.floatMetrics,
info.floatTimestamps,
out);
}
case MetricType::Enum: {
return getNumericValue(metric.storeIdx,
metric.pos,
metric.filledMetrics,
historyOffset,
info.enumMetrics,
info.enumTimestamps,
out);
metric.pos,
metric.filledMetrics,
historyOffset,
info.enumMetrics,
info.enumTimestamps,
out);
}
case MetricType::String:
case MetricType::Unknown:
Expand Down Expand Up @@ -202,10 +202,10 @@ void MetricAggregator::mergeMetrics(const std::vector<DeviceMetricsInfo>& metric
const std::vector<DeviceSpec>& specs)
{
if (mPolicy->getReduction() == AggregationMetricType::Simple) {
flushMetricsSimple(metrics, driverMetrics, specs);
} else {
flushMetrics(metrics, driverMetrics, specs);
}
flushMetricsSimple(metrics, driverMetrics, specs);
} else {
flushMetrics(metrics, driverMetrics, specs);
}
}

std::string MetricAggregator::getMetricNameFromPolicy(std::string_view metricName, AggregationMetricType aggregationType)
Expand All @@ -222,8 +222,8 @@ std::string MetricAggregator::getMetricNameFromPolicy(std::string_view metricNam
}

void MetricAggregator::flushMetricsSimple(const std::vector<DeviceMetricsInfo>& deviceMetricsInfo,
const DeviceMetricsInfo& driverMetrics,
const std::vector<DeviceSpec>& specs)
const DeviceMetricsInfo& driverMetrics,
const std::vector<DeviceSpec>& specs)
{
auto const nDevices = std::min(deviceMetricsInfo.size(), specs.size());
if (nDevices == 0) {
Expand Down Expand Up @@ -322,40 +322,39 @@ void MetricAggregator::flushMetrics(const std::vector<DeviceMetricsInfo>& device
}

const auto handlers = std::unordered_map<AggregationMetricType,
std::function<double(const std::vector<MetricSample>&)>>{
std::function<double(const std::vector<MetricSample>&)>>{
{AggregationMetricType::Sum, [](const auto& windows) {
double sum = 0.0;
for (const auto& window : windows) {
sum += window.value;
}
return sum;
}},
double sum = 0.0;
for (const auto& window : windows) {
sum += window.value;
}
return sum;
}},
{AggregationMetricType::Average, [](const auto& windows) {
double sum = 0.0;
std::size_t count = 0;
for (const auto& window : windows) {
sum += window.value;
++count;
}
return windows.empty() ? 0.0 : sum / count;
}},
double sum = 0.0;
std::size_t count = 0;
for (const auto& window : windows) {
sum += window.value;
++count;
}
return windows.empty() ? 0.0 : sum / count;
}},
{AggregationMetricType::Rate, [this, &metricName](const auto& windows) {
double sumRate = 0.0;
auto const& previousSamples = mLastSentSamples[std::string{metricName}];
if (mLastSentSamples.empty() || previousSamples.size() != windows.size()) {
return 0.0;
}
for (const auto& window : windows) {
double deltaValue = window.value - previousSamples[&window - &windows[0]].value;
double deltaTimeSec = (window.timestamp - previousSamples[&window - &windows[0]].timestamp) / 1000.0;
if (deltaTimeSec <= 0) {
continue;
}
sumRate += deltaValue / deltaTimeSec;
}
return sumRate;
}}
};
double sumRate = 0.0;
auto const& previousSamples = mLastSentSamples[std::string{metricName}];
if (mLastSentSamples.empty() || previousSamples.size() != windows.size()) {
return 0.0;
}
for (const auto& window : windows) {
double deltaValue = window.value - previousSamples[&window - &windows[0]].value;
double deltaTimeSec = (window.timestamp - previousSamples[&window - &windows[0]].timestamp) / 1000.0;
if (deltaTimeSec <= 0) {
continue;
}
sumRate += deltaValue / deltaTimeSec;
}
return sumRate;
}}};

double reducedValue = 0.0;
auto handlerIt = handlers.find(metricAggregationType);
Expand All @@ -375,7 +374,7 @@ void MetricAggregator::flushMetrics(const std::vector<DeviceMetricsInfo>& device
mMonitoring->send(std::move(metric));
mLastSentSamples[std::string{metricName}] = deviceSamples;
} catch (const std::exception& e) {
LOGP(error, "[MetricAggregator] Error determining aggregation type: {}",e.what());
LOGP(error, "[MetricAggregator] Error determining aggregation type: {}", e.what());
continue;
}
}
Expand Down