forked from lug/matterbridge
		
	
		
			
				
	
	
		
			305 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			305 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package logr
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"os"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| // Target represents a destination for log records such as file,
 | |
| // database, TCP socket, etc.
 | |
| type Target interface {
 | |
| 	// Init is called once to initialize the target.
 | |
| 	Init() error
 | |
| 
 | |
| 	// Write outputs to this target's destination.
 | |
| 	Write(p []byte, rec *LogRec) (int, error)
 | |
| 
 | |
| 	// Shutdown is called once to free/close any resources.
 | |
| 	// Target queue is already drained when this is called.
 | |
| 	Shutdown() error
 | |
| }
 | |
| 
 | |
| type targetMetrics struct {
 | |
| 	queueSizeGauge Gauge
 | |
| 	loggedCounter  Counter
 | |
| 	errorCounter   Counter
 | |
| 	droppedCounter Counter
 | |
| 	blockedCounter Counter
 | |
| }
 | |
| 
 | |
| type targetHostOptions struct {
 | |
| 	name         string
 | |
| 	filter       Filter
 | |
| 	formatter    Formatter
 | |
| 	maxQueueSize int
 | |
| 	metrics      *metrics
 | |
| }
 | |
| 
 | |
| // TargetHost hosts and manages the lifecycle of a target.
 | |
| // Incoming log records are queued and formatted before
 | |
| // being passed to the target.
 | |
| type TargetHost struct {
 | |
| 	target Target
 | |
| 	name   string
 | |
| 
 | |
| 	filter    Filter
 | |
| 	formatter Formatter
 | |
| 
 | |
| 	in            chan *LogRec
 | |
| 	quit          chan struct{} // closed by Shutdown to exit read loop
 | |
| 	done          chan struct{} // closed when read loop exited
 | |
| 	targetMetrics *targetMetrics
 | |
| 
 | |
| 	shutdown int32
 | |
| }
 | |
| 
 | |
| func newTargetHost(target Target, options targetHostOptions) (*TargetHost, error) {
 | |
| 	host := &TargetHost{
 | |
| 		target:    target,
 | |
| 		name:      options.name,
 | |
| 		filter:    options.filter,
 | |
| 		formatter: options.formatter,
 | |
| 		in:        make(chan *LogRec, options.maxQueueSize),
 | |
| 		quit:      make(chan struct{}),
 | |
| 		done:      make(chan struct{}),
 | |
| 	}
 | |
| 
 | |
| 	if host.name == "" {
 | |
| 		host.name = fmt.Sprintf("%T", target)
 | |
| 	}
 | |
| 
 | |
| 	if host.filter == nil {
 | |
| 		host.filter = &StdFilter{Lvl: Fatal}
 | |
| 	}
 | |
| 	if host.formatter == nil {
 | |
| 		host.formatter = &DefaultFormatter{}
 | |
| 	}
 | |
| 
 | |
| 	err := host.initMetrics(options.metrics)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	err = target.Init()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	go host.start()
 | |
| 
 | |
| 	return host, nil
 | |
| }
 | |
| 
 | |
| func (h *TargetHost) initMetrics(metrics *metrics) error {
 | |
| 	if metrics == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	var err error
 | |
| 	tmetrics := &targetMetrics{}
 | |
| 
 | |
| 	if tmetrics.queueSizeGauge, err = metrics.collector.QueueSizeGauge(h.name); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if tmetrics.loggedCounter, err = metrics.collector.LoggedCounter(h.name); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if tmetrics.errorCounter, err = metrics.collector.ErrorCounter(h.name); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if tmetrics.droppedCounter, err = metrics.collector.DroppedCounter(h.name); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if tmetrics.blockedCounter, err = metrics.collector.BlockedCounter(h.name); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	h.targetMetrics = tmetrics
 | |
| 
 | |
| 	updateFreqMillis := metrics.updateFreqMillis
 | |
| 	if updateFreqMillis == 0 {
 | |
| 		updateFreqMillis = DefMetricsUpdateFreqMillis
 | |
| 	}
 | |
| 	if updateFreqMillis < 250 {
 | |
| 		updateFreqMillis = 250 // don't peg the CPU
 | |
| 	}
 | |
| 
 | |
| 	go h.startMetricsUpdater(updateFreqMillis)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // IsLevelEnabled returns true if this target should emit logs for the specified level.
 | |
| func (h *TargetHost) IsLevelEnabled(lvl Level) (enabled bool, level Level) {
 | |
| 	level, enabled = h.filter.GetEnabledLevel(lvl)
 | |
| 	return enabled, level
 | |
| }
 | |
| 
 | |
| // Shutdown stops processing log records after making best
 | |
| // effort to flush queue.
 | |
| func (h *TargetHost) Shutdown(ctx context.Context) error {
 | |
| 	if atomic.SwapInt32(&h.shutdown, 1) != 0 {
 | |
| 		return errors.New("targetHost shutdown called more than once")
 | |
| 	}
 | |
| 
 | |
| 	close(h.quit)
 | |
| 
 | |
| 	// No more records can be accepted; now wait for read loop to exit.
 | |
| 	select {
 | |
| 	case <-ctx.Done():
 | |
| 	case <-h.done:
 | |
| 	}
 | |
| 
 | |
| 	// b.in channel should now be drained.
 | |
| 	return h.target.Shutdown()
 | |
| }
 | |
| 
 | |
| // Log queues a log record to be output to this target's destination.
 | |
| func (h *TargetHost) Log(rec *LogRec) {
 | |
| 	if atomic.LoadInt32(&h.shutdown) != 0 {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	lgr := rec.Logger().Logr()
 | |
| 	select {
 | |
| 	case h.in <- rec:
 | |
| 	default:
 | |
| 		handler := lgr.options.onTargetQueueFull
 | |
| 		if handler != nil && handler(h.target, rec, cap(h.in)) {
 | |
| 			h.incDroppedCounter()
 | |
| 			return // drop the record
 | |
| 		}
 | |
| 		h.incBlockedCounter()
 | |
| 
 | |
| 		select {
 | |
| 		case <-time.After(lgr.options.enqueueTimeout):
 | |
| 			lgr.ReportError(fmt.Errorf("target enqueue timeout for log rec [%v]", rec))
 | |
| 		case h.in <- rec: // block until success or timeout
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (h *TargetHost) setQueueSizeGauge(val float64) {
 | |
| 	if h.targetMetrics != nil {
 | |
| 		h.targetMetrics.queueSizeGauge.Set(val)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (h *TargetHost) incLoggedCounter() {
 | |
| 	if h.targetMetrics != nil {
 | |
| 		h.targetMetrics.loggedCounter.Inc()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (h *TargetHost) incErrorCounter() {
 | |
| 	if h.targetMetrics != nil {
 | |
| 		h.targetMetrics.errorCounter.Inc()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (h *TargetHost) incDroppedCounter() {
 | |
| 	if h.targetMetrics != nil {
 | |
| 		h.targetMetrics.droppedCounter.Inc()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (h *TargetHost) incBlockedCounter() {
 | |
| 	if h.targetMetrics != nil {
 | |
| 		h.targetMetrics.blockedCounter.Inc()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // String returns a name for this target.
 | |
| func (h *TargetHost) String() string {
 | |
| 	return h.name
 | |
| }
 | |
| 
 | |
| // start accepts log records via In channel and writes to the
 | |
| // supplied target, until Done channel signaled.
 | |
| func (h *TargetHost) start() {
 | |
| 	defer func() {
 | |
| 		if r := recover(); r != nil {
 | |
| 			fmt.Fprintln(os.Stderr, "TargetHost.start -- ", r)
 | |
| 			go h.start()
 | |
| 		} else {
 | |
| 			close(h.done)
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	for {
 | |
| 		var rec *LogRec
 | |
| 		select {
 | |
| 		case rec = <-h.in:
 | |
| 			if rec.flush != nil {
 | |
| 				h.flush(rec.flush)
 | |
| 			} else {
 | |
| 				err := h.writeRec(rec)
 | |
| 				if err != nil {
 | |
| 					h.incErrorCounter()
 | |
| 					rec.Logger().Logr().ReportError(err)
 | |
| 				} else {
 | |
| 					h.incLoggedCounter()
 | |
| 				}
 | |
| 			}
 | |
| 		case <-h.quit:
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (h *TargetHost) writeRec(rec *LogRec) error {
 | |
| 	level, enabled := h.filter.GetEnabledLevel(rec.Level())
 | |
| 	if !enabled {
 | |
| 		// how did we get here?
 | |
| 		return fmt.Errorf("level %s not enabled for target %s", rec.Level().Name, h.name)
 | |
| 	}
 | |
| 
 | |
| 	buf := rec.logger.lgr.BorrowBuffer()
 | |
| 	defer rec.logger.lgr.ReleaseBuffer(buf)
 | |
| 
 | |
| 	buf, err := h.formatter.Format(rec, level, buf)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	_, err = h.target.Write(buf.Bytes(), rec)
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // startMetricsUpdater updates the metrics for any polled values every `updateFreqMillis` seconds until
 | |
| // target is shut down.
 | |
| func (h *TargetHost) startMetricsUpdater(updateFreqMillis int64) {
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-h.done:
 | |
| 			return
 | |
| 		case <-time.After(time.Duration(updateFreqMillis) * time.Millisecond):
 | |
| 			h.setQueueSizeGauge(float64(len(h.in)))
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // flush drains the queue and notifies when done.
 | |
| func (h *TargetHost) flush(done chan<- struct{}) {
 | |
| 	for {
 | |
| 		var rec *LogRec
 | |
| 		var err error
 | |
| 		select {
 | |
| 		case rec = <-h.in:
 | |
| 			// ignore any redundant flush records.
 | |
| 			if rec.flush == nil {
 | |
| 				err = h.writeRec(rec)
 | |
| 				if err != nil {
 | |
| 					h.incErrorCounter()
 | |
| 					rec.Logger().Logr().ReportError(err)
 | |
| 				}
 | |
| 			}
 | |
| 		default:
 | |
| 			done <- struct{}{}
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | 
