core.go 10.3 KB
package pipeline

import (
	"context"
	"net"
	"net/http"
	"os"
	"time"
)

// The Factory interface represents an object that can create its Policy object. Each HTTP request sent
// requires that this Factory create a new instance of its Policy object.
type Factory interface {
	New(next Policy, po *PolicyOptions) Policy
}

// FactoryFunc is an adapter that allows the use of an ordinary function as a Factory interface.
type FactoryFunc func(next Policy, po *PolicyOptions) PolicyFunc

// New calls f(next,po).
func (f FactoryFunc) New(next Policy, po *PolicyOptions) Policy {
	return f(next, po)
}

// The Policy interface represents a mutable Policy object created by a Factory. The object can mutate/process
// the HTTP request and then forward it on to the next Policy object in the linked-list. The returned
// Response goes backward through the linked-list for additional processing.
// NOTE: Request is passed by value so changes do not change the caller's version of
// the request. However, Request has some fields that reference mutable objects (not strings).
// These references are copied; a deep copy is not performed. Specifically, this means that
// you should avoid modifying the objects referred to by these fields: URL, Header, Body,
// GetBody, TransferEncoding, Form, MultipartForm, Trailer, TLS, Cancel, and Response.
type Policy interface {
	Do(ctx context.Context, request Request) (Response, error)
}

// PolicyFunc is an adapter that allows the use of an ordinary function as a Policy interface.
type PolicyFunc func(ctx context.Context, request Request) (Response, error)

// Do calls f(ctx, request).
func (f PolicyFunc) Do(ctx context.Context, request Request) (Response, error) {
	return f(ctx, request)
}

// Options configures a Pipeline's behavior.
type Options struct {
	HTTPSender Factory // If sender is nil, then the pipeline's default client is used to send the HTTP requests.
	Log        LogOptions
}

// LogLevel tells a logger the minimum level to log. When code reports a log entry,
// the LogLevel indicates the level of the log entry. The logger only records entries
// whose level is at least the level it was told to log. See the Log* constants.
// For example, if a logger is configured with LogError, then LogError, LogPanic,
// and LogFatal entries will be logged; lower level entries are ignored.
type LogLevel uint32

const (
	// LogNone tells a logger not to log any entries passed to it.
	LogNone LogLevel = iota

	// LogFatal tells a logger to log all LogFatal entries passed to it.
	LogFatal

	// LogPanic tells a logger to log all LogPanic and LogFatal entries passed to it.
	LogPanic

	// LogError tells a logger to log all LogError, LogPanic and LogFatal entries passed to it.
	LogError

	// LogWarning tells a logger to log all LogWarning, LogError, LogPanic and LogFatal entries passed to it.
	LogWarning

	// LogInfo tells a logger to log all LogInfo, LogWarning, LogError, LogPanic and LogFatal entries passed to it.
	LogInfo

	// LogDebug tells a logger to log all LogDebug, LogInfo, LogWarning, LogError, LogPanic and LogFatal entries passed to it.
	LogDebug
)

// LogOptions configures the pipeline's logging mechanism & level filtering.
type LogOptions struct {
	Log func(level LogLevel, message string)

	// ShouldLog is called periodically allowing you to return whether the specified LogLevel should be logged or not.
	// An application can return different values over the its lifetime; this allows the application to dynamically
	// alter what is logged. NOTE: This method can be called by multiple goroutines simultaneously so make sure
	// you implement it in a goroutine-safe way. If nil, nothing is logged (the equivalent of returning LogNone).
	// Usually, the function will be implemented simply like this: return level <= LogWarning
	ShouldLog func(level LogLevel) bool
}

type pipeline struct {
	factories []Factory
	options   Options
}

// The Pipeline interface represents an ordered list of Factory objects and an object implementing the HTTPSender interface.
// You construct a Pipeline by calling the pipeline.NewPipeline function. To send an HTTP request, call pipeline.NewRequest
// and then call Pipeline's Do method passing a context, the request, and a method-specific Factory (or nil). Passing a
// method-specific Factory allows this one call to Do to inject a Policy into the linked-list. The policy is injected where
// the MethodFactoryMarker (see the pipeline.MethodFactoryMarker function) is in the slice of Factory objects.
//
// When Do is called, the Pipeline object asks each Factory object to construct its Policy object and adds each Policy to a linked-list.
// THen, Do sends the Context and Request through all the Policy objects. The final Policy object sends the request over the network
// (via the HTTPSender object passed to NewPipeline) and the response is returned backwards through all the Policy objects.
// Since Pipeline and Factory objects are goroutine-safe, you typically create 1 Pipeline object and reuse it to make many HTTP requests.
type Pipeline interface {
	Do(ctx context.Context, methodFactory Factory, request Request) (Response, error)
}

// NewPipeline creates a new goroutine-safe Pipeline object from the slice of Factory objects and the specified options.
func NewPipeline(factories []Factory, o Options) Pipeline {
	if o.HTTPSender == nil {
		o.HTTPSender = newDefaultHTTPClientFactory()
	}
	if o.Log.Log == nil {
		o.Log.Log = func(LogLevel, string) {} // No-op logger
	}
	return &pipeline{factories: factories, options: o}
}

// Do is called for each and every HTTP request. It tells each Factory to create its own (mutable) Policy object
// replacing a MethodFactoryMarker factory (if it exists) with the methodFactory passed in. Then, the Context and Request
// are sent through the pipeline of Policy objects (which can transform the Request's URL/query parameters/headers) and
// ultimately sends the transformed HTTP request over the network.
func (p *pipeline) Do(ctx context.Context, methodFactory Factory, request Request) (Response, error) {
	response, err := p.newPolicies(methodFactory).Do(ctx, request)
	request.close()
	return response, err
}

func (p *pipeline) newPolicies(methodFactory Factory) Policy {
	// The last Policy is the one that actually sends the request over the wire and gets the response.
	// It is overridable via the Options' HTTPSender field.
	po := &PolicyOptions{pipeline: p} // One object shared by all policy objects
	next := p.options.HTTPSender.New(nil, po)

	// Walk over the slice of Factory objects in reverse (from wire to API)
	markers := 0
	for i := len(p.factories) - 1; i >= 0; i-- {
		factory := p.factories[i]
		if _, ok := factory.(methodFactoryMarker); ok {
			markers++
			if markers > 1 {
				panic("MethodFactoryMarker can only appear once in the pipeline")
			}
			if methodFactory != nil {
				// Replace MethodFactoryMarker with passed-in methodFactory
				next = methodFactory.New(next, po)
			}
		} else {
			// Use the slice's Factory to construct its Policy
			next = factory.New(next, po)
		}
	}

	// Each Factory has created its Policy
	if markers == 0 && methodFactory != nil {
		panic("Non-nil methodFactory requires MethodFactoryMarker in the pipeline")
	}
	return next // Return head of the Policy object linked-list
}

// A PolicyOptions represents optional information that can be used by a node in the
// linked-list of Policy objects. A PolicyOptions is passed to the Factory's New method
// which passes it (if desired) to the Policy object it creates. Today, the Policy object
// uses the options to perform logging. But, in the future, this could be used for more.
type PolicyOptions struct {
	pipeline *pipeline
}

// ShouldLog returns true if the specified log level should be logged.
func (po *PolicyOptions) ShouldLog(level LogLevel) bool {
	if po.pipeline.options.Log.ShouldLog != nil {
		return po.pipeline.options.Log.ShouldLog(level)
	}
	return false
}

// Log logs a string to the Pipeline's Logger.
func (po *PolicyOptions) Log(level LogLevel, msg string) {
	if !po.ShouldLog(level) {
		return // Short circuit message formatting if we're not logging it
	}

	// We are logging it, ensure trailing newline
	if len(msg) == 0 || msg[len(msg)-1] != '\n' {
		msg += "\n" // Ensure trailing newline
	}
	po.pipeline.options.Log.Log(level, msg)

	// If logger doesn't handle fatal/panic, we'll do it here.
	if level == LogFatal {
		os.Exit(1)
	} else if level == LogPanic {
		panic(msg)
	}
}

var pipelineHTTPClient = newDefaultHTTPClient()

func newDefaultHTTPClient() *http.Client {
	// We want the Transport to have a large connection pool
	return &http.Client{
		Transport: &http.Transport{
			Proxy: http.ProxyFromEnvironment,
			// We use Dial instead of DialContext as DialContext has been reported to cause slower performance.
			Dial /*Context*/ : (&net.Dialer{
				Timeout:   30 * time.Second,
				KeepAlive: 30 * time.Second,
				DualStack: true,
			}).Dial, /*Context*/
			MaxIdleConns:           0, // No limit
			MaxIdleConnsPerHost:    100,
			IdleConnTimeout:        90 * time.Second,
			TLSHandshakeTimeout:    10 * time.Second,
			ExpectContinueTimeout:  1 * time.Second,
			DisableKeepAlives:      false,
			DisableCompression:     false,
			MaxResponseHeaderBytes: 0,
			//ResponseHeaderTimeout:  time.Duration{},
			//ExpectContinueTimeout:  time.Duration{},
		},
	}
}

// newDefaultHTTPClientFactory creates a DefaultHTTPClientPolicyFactory object that sends HTTP requests to a Go's default http.Client.
func newDefaultHTTPClientFactory() Factory {
	return FactoryFunc(func(next Policy, po *PolicyOptions) PolicyFunc {
		return func(ctx context.Context, request Request) (Response, error) {
			r, err := pipelineHTTPClient.Do(request.WithContext(ctx))
			if err != nil {
				err = NewError(err, "HTTP request failed")
			}
			return NewHTTPResponse(r), err
		}
	})
}

var mfm = methodFactoryMarker{} // Singleton

// MethodFactoryMarker returns a special marker Factory object. When Pipeline's Do method is called, any
// MethodMarkerFactory object is replaced with the specified methodFactory object. If nil is passed fro Do's
// methodFactory parameter, then the MethodFactoryMarker is ignored as the linked-list of Policy objects is created.
func MethodFactoryMarker() Factory {
	return mfm
}

type methodFactoryMarker struct {
}

func (methodFactoryMarker) New(next Policy, po *PolicyOptions) Policy {
	panic("methodFactoryMarker policy should have been replaced with a method policy")
}