Function bodies 23 total
config.LogConfig.SlogLevel method · go · L44-L55 (12 LOC)config/config.go
func (lc *LogConfig) SlogLevel() slog.Level {
switch strings.ToLower(lc.Level) {
case "debug":
return slog.LevelDebug
case "warn":
return slog.LevelWarn
case "error":
return slog.LevelError
default:
return slog.LevelInfo
}
}config.Load function · go · L58-L76 (19 LOC)config/config.go
func Load(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read config file: %w", err)
}
content := os.ExpandEnv(string(data))
var cfg Config
if err := yaml.Unmarshal([]byte(content), &cfg); err != nil {
return nil, fmt.Errorf("failed to parse config file: %w", err)
}
if err := cfg.validate(); err != nil {
return nil, fmt.Errorf("config validation failed: %w", err)
}
return &cfg, nil
}config.Config.validate method · go · L78-L112 (35 LOC)config/config.go
func (c *Config) validate() error {
if c.Server.Port <= 0 || c.Server.Port > 65535 {
return fmt.Errorf("invalid port number: %d", c.Server.Port)
}
if c.Metrics.TTLHours <= 0 {
c.Metrics.TTLHours = 48
}
if c.Metrics.CleanupIntervalMinutes <= 0 {
c.Metrics.CleanupIntervalMinutes = 60
}
if c.Auth.APIKey == "" {
return fmt.Errorf("auth.api_key is required (set HEALTH_INGEST_API_KEY environment variable)")
}
switch strings.ToLower(c.Log.Level) {
case "debug", "info", "warn", "error":
// OK
default:
return fmt.Errorf("invalid log level: %q", c.Log.Level)
}
switch strings.ToLower(c.Log.Format) {
case "json", "text":
// OK
case "":
c.Log.Format = "json"
default:
return fmt.Errorf("invalid log format: %q (must be json or text)", c.Log.Format)
}
return nil
}converter.Convert function · go · L48-L153 (106 LOC)converter/health_export.go
func Convert(payload []byte) ([]store.MetricSample, error) {
if len(payload) == 0 {
return nil, fmt.Errorf("empty payload")
}
metrics, err := parseMetrics(payload)
if err != nil {
return nil, fmt.Errorf("failed to parse JSON: %w", err)
}
if len(metrics) == 0 {
return nil, fmt.Errorf("no metrics found")
}
var samples []store.MetricSample
for _, m := range metrics {
mapping, ok := AllowedMetrics[m.Name]
if !ok {
slog.Debug("skipping unknown metric", "name", m.Name)
continue
}
for _, dp := range m.Data {
ts := parseTimestamp(dp.Date)
// base value (qty or value)
var baseValue *float64
if dp.Qty != nil {
baseValue = dp.Qty
} else if dp.Value != nil {
baseValue = dp.Value
}
if baseValue != nil {
labels := map[string]string{}
if dp.Stage != "" {
labels["stage"] = dp.Stage
}
samples = append(samples, store.MetricSample{
Name: mapping.PrometheusName,
Labels: labels,
Value: *baseValconverter.parseMetrics function · go · L156-L176 (21 LOC)converter/health_export.go
func parseMetrics(payload []byte) ([]rawMetric, error) {
// 1. {"data": {"metrics": [...]}} format
var wrapped wrappedPayload
if err := json.Unmarshal(payload, &wrapped); err == nil && len(wrapped.Data.Metrics) > 0 {
return wrapped.Data.Metrics, nil
}
// 2. {"metrics": [...]} format
var mp metricsPayload
if err := json.Unmarshal(payload, &mp); err == nil && len(mp.Metrics) > 0 {
return mp.Metrics, nil
}
// 3. direct array [...] format
var arr []rawMetric
if err := json.Unmarshal(payload, &arr); err == nil && len(arr) > 0 {
return arr, nil
}
return nil, fmt.Errorf("unrecognized JSON format")
}converter.parseTimestamp function · go · L179-L197 (19 LOC)converter/health_export.go
func parseTimestamp(s string) time.Time {
// "2024-01-01 12:00:00 +0900" format
formats := []string{
"2006-01-02 15:04:05 -0700",
"2006-01-02T15:04:05Z07:00",
"2006-01-02T15:04:05Z",
"2006-01-02 15:04:05",
"2006-01-02",
}
for _, f := range formats {
if t, err := time.Parse(f, s); err == nil {
return t
}
}
slog.Warn("failed to parse timestamp, using current time", "raw", s)
return time.Now()
}converter.init function · go · L34-L39 (6 LOC)converter/metric_names.go
func init() {
PromNameToMapping = make(map[string]MetricMapping, len(AllowedMetrics))
for _, m := range AllowedMetrics {
PromNameToMapping[m.PrometheusName] = m
}
}Open data scored by Repobility · https://repobility.com
handlers.NewHealthHandler function · go · L8-L19 (12 LOC)handlers/health.go
func NewHealthHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
sendError(w, http.StatusMethodNotAllowed, "method not allowed, GET required")
return
}
sendJSON(w, http.StatusOK, map[string]interface{}{
"status": "ok",
})
}
}handlers.NewIngestHandler function · go · L17-L58 (42 LOC)handlers/ingest.go
func NewIngestHandler(ms *store.MetricsStore) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
sendError(w, http.StatusMethodNotAllowed, "method not allowed, POST required")
return
}
ct := r.Header.Get("Content-Type")
if ct != "" {
mediaType, _, err := mime.ParseMediaType(ct)
if err != nil || mediaType != "application/json" {
sendError(w, http.StatusUnsupportedMediaType, "unsupported media type, application/json required")
return
}
}
defer r.Body.Close()
body, err := io.ReadAll(io.LimitReader(r.Body, maxBodySize+1))
if err != nil {
sendError(w, http.StatusBadRequest, fmt.Sprintf("failed to read request body: %v", err))
return
}
if int64(len(body)) > maxBodySize {
sendError(w, http.StatusRequestEntityTooLarge, "request body exceeds 10MB limit")
return
}
samples, err := converter.Convert(body)
if err != nil {
sendError(w, http.StatusBadRequest, fmt.Sprintf("failed handlers.NewMetricsHandler function · go · L14-L92 (79 LOC)handlers/metrics.go
func NewMetricsHandler(ms *store.MetricsStore) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
sendError(w, http.StatusMethodNotAllowed, "method not allowed, GET required")
return
}
samples := ms.GetAll()
lastReceived, totalSamples, activeMetrics := ms.GetStats()
var b strings.Builder
// Operational metrics
b.WriteString("# HELP health_ingest_last_received_timestamp Unix timestamp of last data received\n")
b.WriteString("# TYPE health_ingest_last_received_timestamp gauge\n")
if !lastReceived.IsZero() {
fmt.Fprintf(&b, "health_ingest_last_received_timestamp %d\n", lastReceived.Unix())
} else {
b.WriteString("health_ingest_last_received_timestamp 0\n")
}
b.WriteString("# HELP health_ingest_samples_total Total number of samples received\n")
b.WriteString("# TYPE health_ingest_samples_total counter\n")
fmt.Fprintf(&b, "health_ingest_samples_total %d\n", totalSamples)
b.WriteString("# HEhandlers.formatSample function · go · L95-L117 (23 LOC)handlers/metrics.go
func formatSample(s store.MetricSample) string {
if len(s.Labels) == 0 {
return fmt.Sprintf("%s %g %d", s.Name, s.Value, s.Timestamp.UnixMilli())
}
keys := make([]string, 0, len(s.Labels))
for k := range s.Labels {
keys = append(keys, k)
}
sort.Strings(keys)
var lb strings.Builder
lb.WriteByte('{')
for i, k := range keys {
if i > 0 {
lb.WriteByte(',')
}
fmt.Fprintf(&lb, "%s=%q", k, s.Labels[k])
}
lb.WriteByte('}')
return fmt.Sprintf("%s%s %g %d", s.Name, lb.String(), s.Value, s.Timestamp.UnixMilli())
}handlers.sendJSON function · go · L10-L17 (8 LOC)handlers/response.go
func sendJSON(w http.ResponseWriter, statusCode int, data interface{}) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
if err := json.NewEncoder(w).Encode(data); err != nil {
slog.Error("failed to encode JSON response", "error", err)
}
}handlers.sendError function · go · L20-L25 (6 LOC)handlers/response.go
func sendError(w http.ResponseWriter, statusCode int, message string) {
sendJSON(w, statusCode, map[string]interface{}{
"success": false,
"message": message,
})
}handlers.NewStatusHandler function · go · L11-L36 (26 LOC)handlers/status.go
func NewStatusHandler(ms *store.MetricsStore) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
sendError(w, http.StatusMethodNotAllowed, "method not allowed, GET required")
return
}
lastReceived, totalSamples, activeMetrics := ms.GetStats()
lastReceivedStr := ""
if !lastReceived.IsZero() {
lastReceivedStr = lastReceived.Format(time.RFC3339)
}
sendJSON(w, http.StatusOK, map[string]interface{}{
"success": true,
"message": "Service status",
"data": map[string]interface{}{
"last_received": lastReceivedStr,
"total_samples": totalSamples,
"active_metrics": activeMetrics,
"timestamp": time.Now().Format(time.RFC3339),
},
})
}
}main.main function · go · L23-L106 (84 LOC)main.go
func main() {
configPath := flag.String("config", "config/config.yaml", "path to config file")
showVersion := flag.Bool("version", false, "show version")
flag.Parse()
if *showVersion {
fmt.Printf("Health Ingest Service v%s\n", Version)
return
}
// Bootstrap logger with JSON/stdout defaults (before config is available)
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil)))
cfg, err := config.Load(*configPath)
if err != nil {
slog.Error("failed to load config", "error", err)
os.Exit(1)
}
// Re-initialize logger with configured level and format
initLogger(&cfg.Log)
slog.Info("starting service", "version", Version)
// Initialize metrics store
ms := store.NewMetricsStore()
// Set up routes
mux := http.NewServeMux()
mux.HandleFunc("/api/ingest", middleware.BearerAuth(cfg.Auth.APIKey, handlers.NewIngestHandler(ms)))
mux.HandleFunc("/api/health", handlers.NewHealthHandler())
mux.HandleFunc("/metrics", handlers.NewMetricsHandler(ms))
mux.HandleFunc("/If a scraper extracted this row, it came from Repobility (https://repobility.com)
main.initLogger function · go · L108-L121 (14 LOC)main.go
func initLogger(logCfg *config.LogConfig) {
level := logCfg.SlogLevel()
opts := &slog.HandlerOptions{Level: level}
var handler slog.Handler
switch strings.ToLower(logCfg.Format) {
case "text":
handler = slog.NewTextHandler(os.Stdout, opts)
default:
handler = slog.NewJSONHandler(os.Stdout, opts)
}
slog.SetDefault(slog.New(handler))
}middleware.BearerAuth function · go · L12-L42 (31 LOC)middleware/auth.go
func BearerAuth(apiKey string, next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Skip auth when API key is empty (development mode)
if apiKey == "" {
next(w, r)
return
}
auth := r.Header.Get("Authorization")
if auth == "" {
slog.Warn("missing authorization header", "method", r.Method, "path", r.URL.Path)
unauthorized(w)
return
}
if !strings.HasPrefix(auth, "Bearer ") {
slog.Warn("invalid authorization scheme", "method", r.Method, "path", r.URL.Path)
unauthorized(w)
return
}
token := strings.TrimPrefix(auth, "Bearer ")
if subtle.ConstantTimeCompare([]byte(token), []byte(apiKey)) != 1 {
slog.Warn("invalid token", "method", r.Method, "path", r.URL.Path)
unauthorized(w)
return
}
next(w, r)
}
}middleware.unauthorized function · go · L44-L51 (8 LOC)middleware/auth.go
func unauthorized(w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusUnauthorized)
json.NewEncoder(w).Encode(map[string]interface{}{
"success": false,
"message": "Unauthorized",
})
}store.buildKey function · go · L36-L58 (23 LOC)store/metrics_store.go
func buildKey(name string, labels map[string]string) string {
if len(labels) == 0 {
return name
}
keys := make([]string, 0, len(labels))
for k := range labels {
keys = append(keys, k)
}
sort.Strings(keys)
var b strings.Builder
b.WriteString(name)
b.WriteByte('{')
for i, k := range keys {
if i > 0 {
b.WriteByte(',')
}
fmt.Fprintf(&b, "%s=%q", k, labels[k])
}
b.WriteByte('}')
return b.String()
}store.MetricsStore.Update method · go · L61-L72 (12 LOC)store/metrics_store.go
func (s *MetricsStore) Update(samples []MetricSample) {
s.mu.Lock()
defer s.mu.Unlock()
for _, sample := range samples {
key := buildKey(sample.Name, sample.Labels)
s.samples[key] = sample
}
s.lastReceivedAt = time.Now()
s.totalSamples += int64(len(samples))
}store.MetricsStore.GetAll method · go · L75-L84 (10 LOC)store/metrics_store.go
func (s *MetricsStore) GetAll() []MetricSample {
s.mu.RLock()
defer s.mu.RUnlock()
result := make([]MetricSample, 0, len(s.samples))
for _, sample := range s.samples {
result = append(result, sample)
}
return result
}store.MetricsStore.CleanExpired method · go · L87-L97 (11 LOC)store/metrics_store.go
func (s *MetricsStore) CleanExpired(ttl time.Duration) {
s.mu.Lock()
defer s.mu.Unlock()
cutoff := time.Now().Add(-ttl)
for key, sample := range s.samples {
if sample.Timestamp.Before(cutoff) {
delete(s.samples, key)
}
}
}store.MetricsStore.GetStats method · go · L100-L105 (6 LOC)store/metrics_store.go
func (s *MetricsStore) GetStats() (lastReceived time.Time, totalSamples int64, activeMetrics int) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.lastReceivedAt, s.totalSamples, len(s.samples)
}