Prometheus pull metrics exporter (closes #275)

This commit is contained in:
mdecimus 2024-08-08 09:52:19 +02:00
parent 03307433a7
commit bba371c624
10 changed files with 227 additions and 16 deletions

15
Cargo.lock generated
View file

@ -1047,6 +1047,7 @@ dependencies = [
"parking_lot",
"pem",
"privdrop",
"prometheus",
"proxy-header",
"pwhash",
"rcgen 0.12.1",
@ -4603,6 +4604,20 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "prometheus"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"memchr",
"parking_lot",
"thiserror",
]
[[package]]
name = "prost"
version = "0.13.1"

View file

@ -42,6 +42,7 @@ opentelemetry = { version = "0.24" }
opentelemetry_sdk = { version = "0.24" }
opentelemetry-otlp = { version = "0.17", features = ["http-proto", "reqwest-client"] }
opentelemetry-semantic-conventions = { version = "0.16.0" }
prometheus = { version = "0.13.4", default-features = false }
imagesize = "0.13"
sha1 = "0.10"
sha2 = "0.10.6"

View file

@ -110,10 +110,15 @@ pub struct Tracers {
#[derive(Debug, Clone, Default)]
pub struct Metrics {
pub prometheus: bool,
pub prometheus: Option<PrometheusMetrics>,
pub otel: Option<Arc<OtelMetrics>>,
}
#[derive(Debug, Clone, Default)]
pub struct PrometheusMetrics {
pub auth: Option<String>,
}
impl Telemetry {
pub fn parse(config: &mut Config) -> Self {
let mut telemetry = Telemetry {
@ -553,12 +558,25 @@ impl Tracers {
impl Metrics {
pub fn parse(config: &mut Config) -> Self {
let mut metrics = Metrics {
prometheus: config
.property_or_default("metrics.prometheus.enable", "true")
.unwrap_or(true),
prometheus: None,
otel: None,
};
if config
.property_or_default("metrics.prometheus.enable", "false")
.unwrap_or(false)
{
metrics.prometheus = Some(PrometheusMetrics {
auth: config
.value("metrics.prometheus.auth.username")
.and_then(|user| {
config
.value("metrics.prometheus.auth.secret")
.map(|secret| STANDARD.encode(format!("{user}:{secret}")))
}),
});
}
if config
.property_or_default("metrics.open-telemetry.enable", "false")
.unwrap_or(false)

View file

@ -5,3 +5,4 @@
*/
pub mod otel;
pub mod prometheus;

View file

@ -0,0 +1,124 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use prometheus::{
proto::{Bucket, Counter, Gauge, Histogram, Metric, MetricFamily, MetricType},
TextEncoder,
};
use trc::{atomic::AtomicHistogram, collector::Collector};
use crate::Core;
impl Core {
pub async fn export_prometheus_metrics(&self) -> trc::Result<String> {
let mut metrics = Vec::new();
#[cfg(feature = "enterprise")]
let is_enterprise = self.is_enterprise_edition();
#[cfg(not(feature = "enterprise"))]
let is_enterprise = false;
// Add counters
for counter in Collector::collect_counters(is_enterprise) {
let mut metric = MetricFamily::default();
metric.set_name(metric_name(counter.id()));
metric.set_help(counter.description().into());
metric.set_field_type(MetricType::COUNTER);
metric.set_metric(vec![new_counter(counter.get())]);
metrics.push(metric);
}
// Add event counters
for counter in Collector::collect_event_counters(is_enterprise) {
let mut metric = MetricFamily::default();
metric.set_name(metric_name(counter.id()));
metric.set_help(counter.description().into());
metric.set_field_type(MetricType::COUNTER);
metric.set_metric(vec![new_counter(counter.value())]);
metrics.push(metric);
}
// Add gauges
for gauge in Collector::collect_gauges(is_enterprise) {
let mut metric = MetricFamily::default();
metric.set_name(metric_name(gauge.id()));
metric.set_help(gauge.description().into());
metric.set_field_type(MetricType::GAUGE);
metric.set_metric(vec![new_gauge(gauge.get())]);
metrics.push(metric);
}
// Add histograms
for histogram in Collector::collect_histograms(is_enterprise) {
let mut metric = MetricFamily::default();
metric.set_name(metric_name(histogram.id()));
metric.set_help(histogram.description().into());
metric.set_field_type(MetricType::HISTOGRAM);
metric.set_metric(vec![new_histogram(histogram)]);
metrics.push(metric);
}
TextEncoder::new()
.encode_to_string(&metrics)
.map_err(|e| trc::EventType::Telemetry(trc::TelemetryEvent::OtelExpoterError).reason(e))
}
}
fn metric_name(id: impl AsRef<str>) -> String {
let id = id.as_ref();
let mut name = String::with_capacity(id.len());
for c in id.chars() {
if c.is_ascii_alphanumeric() {
name.push(c);
} else {
name.push('_');
}
}
name
}
fn new_counter(value: u64) -> Metric {
let mut m = Metric::default();
let mut counter = Counter::default();
counter.set_value(value as f64);
m.set_counter(counter);
m
}
fn new_gauge(value: u64) -> Metric {
let mut m = Metric::default();
let mut gauge = Gauge::default();
gauge.set_value(value as f64);
m.set_gauge(gauge);
m
}
fn new_histogram(histogram: &AtomicHistogram<12>) -> Metric {
let mut m = Metric::default();
let mut h = Histogram::default();
h.set_sample_count(histogram.count());
h.set_sample_sum(histogram.sum() as f64);
h.set_bucket(
histogram
.buckets_iter()
.into_iter()
.zip(histogram.upper_bounds_iter())
.map(|(count, upper_bound)| {
let mut b = Bucket::default();
b.set_cumulative_count(count);
b.set_upper_bound(if upper_bound != u64::MAX {
upper_bound as f64
} else {
f64::INFINITY
});
b
})
.collect(),
);
m.set_histogram(h);
m
}

View file

@ -29,7 +29,7 @@ use jmap_proto::{
};
use crate::{
auth::oauth::OAuthMetadata,
auth::{authenticate::HttpHeaders, oauth::OAuthMetadata},
blob::{DownloadResponse, UploadResponse},
services::state,
JmapInstance, JMAP,
@ -322,6 +322,33 @@ impl JMAP {
}
_ => (),
},
"metrics" => match path.next().unwrap_or_default() {
"prometheus" => {
if let Some(prometheus) = &self.core.metrics.prometheus {
if let Some(auth) = &prometheus.auth {
if req
.authorization_basic()
.map_or(true, |secret| secret != auth)
{
return Err(trc::AuthEvent::Failed
.into_err()
.details("Invalid or missing credentials.")
.caused_by(trc::location!()));
}
}
return Ok(Resource {
content_type: "text/plain; version=0.0.4",
contents: self.core.export_prometheus_metrics().await?.into_bytes(),
}
.into_http_response());
}
}
"otel" => {
// Reserved for future use
}
_ => (),
},
_ => {
let path = req.uri().path();
let resource = self

View file

@ -23,13 +23,8 @@ impl JMAP {
req: &hyper::Request<hyper::body::Incoming>,
session: &HttpSessionData,
) -> trc::Result<(InFlight, Arc<AccessToken>)> {
if let Some((mechanism, token)) = req
.headers()
.get(header::AUTHORIZATION)
.and_then(|h| h.to_str().ok())
.and_then(|h| h.split_once(' ').map(|(l, t)| (l, t.trim().to_string())))
{
let access_token = if let Some(account_id) = self.inner.sessions.get_with_ttl(&token) {
if let Some((mechanism, token)) = req.authorization() {
let access_token = if let Some(account_id) = self.inner.sessions.get_with_ttl(token) {
self.get_cached_access_token(account_id).await?
} else {
let access_token = if mechanism.eq_ignore_ascii_case("basic") {
@ -56,7 +51,7 @@ impl JMAP {
return Err(trc::AuthEvent::Error
.into_err()
.details("Failed to decode Basic auth request.")
.id(token)
.id(token.to_string())
.caused_by(trc::location!()));
}
} else if mechanism.eq_ignore_ascii_case("bearer") {
@ -64,7 +59,7 @@ impl JMAP {
self.is_anonymous_allowed(&session.remote_ip).await?;
let (account_id, _, _) =
self.validate_access_token("access_token", &token).await?;
self.validate_access_token("access_token", token).await?;
self.get_access_token(account_id).await?
} else {
@ -73,13 +68,13 @@ impl JMAP {
return Err(trc::AuthEvent::Error
.into_err()
.reason("Unsupported authentication mechanism.")
.details(token)
.details(token.to_string())
.caused_by(trc::location!()));
};
// Cache session
let access_token = Arc::new(access_token);
self.cache_session(token, &access_token);
self.cache_session(token.to_string(), &access_token);
self.cache_access_token(access_token.clone());
access_token
};
@ -186,3 +181,27 @@ impl JMAP {
}
}
}
pub trait HttpHeaders {
fn authorization(&self) -> Option<(&str, &str)>;
fn authorization_basic(&self) -> Option<&str>;
}
impl HttpHeaders for hyper::Request<hyper::body::Incoming> {
fn authorization(&self) -> Option<(&str, &str)> {
self.headers()
.get(header::AUTHORIZATION)
.and_then(|h| h.to_str().ok())
.and_then(|h| h.split_once(' ').map(|(l, t)| (l, t.trim())))
}
fn authorization_basic(&self) -> Option<&str> {
self.authorization().and_then(|(l, t)| {
if l.eq_ignore_ascii_case("basic") {
Some(t)
} else {
None
}
})
}
}

View file

@ -194,6 +194,10 @@ impl<const N: usize> AtomicHistogram<N> {
vec
}
pub fn buckets_len(&self) -> usize {
N
}
pub fn upper_bounds_iter(&self) -> impl IntoIterator<Item = u64> + '_ {
self.upper_bounds.iter().copied()
}

View file

@ -1912,6 +1912,7 @@ impl TelemetryEvent {
TelemetryEvent::JournalError => "Journal collector error",
TelemetryEvent::OtelExpoterError => "OpenTelemetry exporter error",
TelemetryEvent::OtelMetricsExporterError => "OpenTelemetry metrics exporter error",
TelemetryEvent::PrometheusExporterError => "Prometheus exporter error",
}
}
}

View file

@ -654,6 +654,7 @@ pub enum TelemetryEvent {
WebhookError,
OtelExpoterError,
OtelMetricsExporterError,
PrometheusExporterError,
JournalError,
}