matterbridge/vendor/github.com/mattermost/logr/v2/target.go

305 lines
6.6 KiB
Go
Raw Permalink Normal View History

2021-10-16 14:11:32 -07:00
package logr
import (
"context"
"errors"
"fmt"
"os"
"sync/atomic"
"time"
)
// Target represents a destination for log records such as file,
// database, TCP socket, etc.
type Target interface {
// Init is called once to initialize the target.
Init() error
// Write outputs to this target's destination.
Write(p []byte, rec *LogRec) (int, error)
// Shutdown is called once to free/close any resources.
// Target queue is already drained when this is called.
Shutdown() error
}
type targetMetrics struct {
queueSizeGauge Gauge
loggedCounter Counter
errorCounter Counter
droppedCounter Counter
blockedCounter Counter
}
type targetHostOptions struct {
name string
filter Filter
formatter Formatter
maxQueueSize int
metrics *metrics
}
// TargetHost hosts and manages the lifecycle of a target.
// Incoming log records are queued and formatted before
// being passed to the target.
type TargetHost struct {
target Target
name string
filter Filter
formatter Formatter
in chan *LogRec
quit chan struct{} // closed by Shutdown to exit read loop
done chan struct{} // closed when read loop exited
targetMetrics *targetMetrics
shutdown int32
}
func newTargetHost(target Target, options targetHostOptions) (*TargetHost, error) {
host := &TargetHost{
target: target,
name: options.name,
filter: options.filter,
formatter: options.formatter,
in: make(chan *LogRec, options.maxQueueSize),
quit: make(chan struct{}),
done: make(chan struct{}),
}
if host.name == "" {
host.name = fmt.Sprintf("%T", target)
}
if host.filter == nil {
host.filter = &StdFilter{Lvl: Fatal}
}
if host.formatter == nil {
host.formatter = &DefaultFormatter{}
}
err := host.initMetrics(options.metrics)
if err != nil {
return nil, err
}
err = target.Init()
if err != nil {
return nil, err
}
go host.start()
return host, nil
}
func (h *TargetHost) initMetrics(metrics *metrics) error {
if metrics == nil {
return nil
}
var err error
tmetrics := &targetMetrics{}
if tmetrics.queueSizeGauge, err = metrics.collector.QueueSizeGauge(h.name); err != nil {
return err
}
if tmetrics.loggedCounter, err = metrics.collector.LoggedCounter(h.name); err != nil {
return err
}
if tmetrics.errorCounter, err = metrics.collector.ErrorCounter(h.name); err != nil {
return err
}
if tmetrics.droppedCounter, err = metrics.collector.DroppedCounter(h.name); err != nil {
return err
}
if tmetrics.blockedCounter, err = metrics.collector.BlockedCounter(h.name); err != nil {
return err
}
h.targetMetrics = tmetrics
updateFreqMillis := metrics.updateFreqMillis
if updateFreqMillis == 0 {
updateFreqMillis = DefMetricsUpdateFreqMillis
}
if updateFreqMillis < 250 {
updateFreqMillis = 250 // don't peg the CPU
}
go h.startMetricsUpdater(updateFreqMillis)
return nil
}
// IsLevelEnabled returns true if this target should emit logs for the specified level.
func (h *TargetHost) IsLevelEnabled(lvl Level) (enabled bool, level Level) {
level, enabled = h.filter.GetEnabledLevel(lvl)
return enabled, level
}
// Shutdown stops processing log records after making best
// effort to flush queue.
func (h *TargetHost) Shutdown(ctx context.Context) error {
if atomic.SwapInt32(&h.shutdown, 1) != 0 {
return errors.New("targetHost shutdown called more than once")
}
close(h.quit)
// No more records can be accepted; now wait for read loop to exit.
select {
case <-ctx.Done():
case <-h.done:
}
// b.in channel should now be drained.
return h.target.Shutdown()
}
// Log queues a log record to be output to this target's destination.
func (h *TargetHost) Log(rec *LogRec) {
if atomic.LoadInt32(&h.shutdown) != 0 {
return
}
lgr := rec.Logger().Logr()
select {
case h.in <- rec:
default:
handler := lgr.options.onTargetQueueFull
if handler != nil && handler(h.target, rec, cap(h.in)) {
h.incDroppedCounter()
return // drop the record
}
h.incBlockedCounter()
select {
case <-time.After(lgr.options.enqueueTimeout):
lgr.ReportError(fmt.Errorf("target enqueue timeout for log rec [%v]", rec))
case h.in <- rec: // block until success or timeout
}
}
}
func (h *TargetHost) setQueueSizeGauge(val float64) {
if h.targetMetrics != nil {
h.targetMetrics.queueSizeGauge.Set(val)
}
}
func (h *TargetHost) incLoggedCounter() {
if h.targetMetrics != nil {
h.targetMetrics.loggedCounter.Inc()
}
}
func (h *TargetHost) incErrorCounter() {
if h.targetMetrics != nil {
h.targetMetrics.errorCounter.Inc()
}
}
func (h *TargetHost) incDroppedCounter() {
if h.targetMetrics != nil {
h.targetMetrics.droppedCounter.Inc()
}
}
func (h *TargetHost) incBlockedCounter() {
if h.targetMetrics != nil {
h.targetMetrics.blockedCounter.Inc()
}
}
// String returns a name for this target.
func (h *TargetHost) String() string {
return h.name
}
// start accepts log records via In channel and writes to the
// supplied target, until Done channel signaled.
func (h *TargetHost) start() {
defer func() {
if r := recover(); r != nil {
fmt.Fprintln(os.Stderr, "TargetHost.start -- ", r)
go h.start()
} else {
close(h.done)
}
}()
for {
var rec *LogRec
select {
case rec = <-h.in:
if rec.flush != nil {
h.flush(rec.flush)
} else {
err := h.writeRec(rec)
if err != nil {
h.incErrorCounter()
rec.Logger().Logr().ReportError(err)
} else {
h.incLoggedCounter()
}
}
case <-h.quit:
return
}
}
}
func (h *TargetHost) writeRec(rec *LogRec) error {
level, enabled := h.filter.GetEnabledLevel(rec.Level())
if !enabled {
// how did we get here?
return fmt.Errorf("level %s not enabled for target %s", rec.Level().Name, h.name)
}
buf := rec.logger.lgr.BorrowBuffer()
defer rec.logger.lgr.ReleaseBuffer(buf)
buf, err := h.formatter.Format(rec, level, buf)
if err != nil {
return err
}
_, err = h.target.Write(buf.Bytes(), rec)
return err
}
// startMetricsUpdater updates the metrics for any polled values every `updateFreqMillis` seconds until
// target is shut down.
func (h *TargetHost) startMetricsUpdater(updateFreqMillis int64) {
for {
select {
case <-h.done:
return
case <-time.After(time.Duration(updateFreqMillis) * time.Millisecond):
h.setQueueSizeGauge(float64(len(h.in)))
}
}
}
// flush drains the queue and notifies when done.
func (h *TargetHost) flush(done chan<- struct{}) {
for {
var rec *LogRec
var err error
select {
case rec = <-h.in:
// ignore any redundant flush records.
if rec.flush == nil {
err = h.writeRec(rec)
if err != nil {
h.incErrorCounter()
rec.Logger().Logr().ReportError(err)
}
}
default:
done <- struct{}{}
return
}
}
}