matterbridge/vendor/github.com/graph-gophers/graphql-go/internal/exec/subscribe.go
2022-04-01 00:23:19 +02:00

180 lines
4.3 KiB
Go

package exec
import (
"bytes"
"context"
"encoding/json"
"fmt"
"reflect"
"time"
"github.com/graph-gophers/graphql-go/errors"
"github.com/graph-gophers/graphql-go/internal/exec/resolvable"
"github.com/graph-gophers/graphql-go/internal/exec/selected"
"github.com/graph-gophers/graphql-go/types"
)
type Response struct {
Data json.RawMessage
Errors []*errors.QueryError
}
func (r *Request) Subscribe(ctx context.Context, s *resolvable.Schema, op *types.OperationDefinition) <-chan *Response {
var result reflect.Value
var f *fieldToExec
var err *errors.QueryError
func() {
defer r.handlePanic(ctx)
sels := selected.ApplyOperation(&r.Request, s, op)
var fields []*fieldToExec
collectFieldsToResolve(sels, s, s.Resolver, &fields, make(map[string]*fieldToExec))
// TODO: move this check into validation.Validate
if len(fields) != 1 {
err = errors.Errorf("%s", "can subscribe to at most one subscription at a time")
return
}
f = fields[0]
var in []reflect.Value
if f.field.HasContext {
in = append(in, reflect.ValueOf(ctx))
}
if f.field.ArgsPacker != nil {
in = append(in, f.field.PackedArgs)
}
callOut := f.resolver.Method(f.field.MethodIndex).Call(in)
result = callOut[0]
if f.field.HasError && !callOut[1].IsNil() {
switch resolverErr := callOut[1].Interface().(type) {
case *errors.QueryError:
err = resolverErr
case error:
err = errors.Errorf("%s", resolverErr)
err.ResolverError = resolverErr
default:
panic(fmt.Errorf("can only deal with *QueryError and error types, got %T", resolverErr))
}
}
}()
// Handles the case where the locally executed func above panicked
if len(r.Request.Errs) > 0 {
return sendAndReturnClosed(&Response{Errors: r.Request.Errs})
}
if f == nil {
return sendAndReturnClosed(&Response{Errors: []*errors.QueryError{err}})
}
if err != nil {
if _, nonNullChild := f.field.Type.(*types.NonNull); nonNullChild {
return sendAndReturnClosed(&Response{Errors: []*errors.QueryError{err}})
}
return sendAndReturnClosed(&Response{Data: []byte(fmt.Sprintf(`{"%s":null}`, f.field.Alias)), Errors: []*errors.QueryError{err}})
}
if ctxErr := ctx.Err(); ctxErr != nil {
return sendAndReturnClosed(&Response{Errors: []*errors.QueryError{errors.Errorf("%s", ctxErr)}})
}
c := make(chan *Response)
// TODO: handle resolver nil channel better?
if result.IsZero() {
close(c)
return c
}
go func() {
for {
// Check subscription context
chosen, resp, ok := reflect.Select([]reflect.SelectCase{
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
},
{
Dir: reflect.SelectRecv,
Chan: result,
},
})
switch chosen {
// subscription context done
case 0:
close(c)
return
// upstream received
case 1:
// upstream closed
if !ok {
close(c)
return
}
subR := &Request{
Request: selected.Request{
Doc: r.Request.Doc,
Vars: r.Request.Vars,
Schema: r.Request.Schema,
},
Limiter: r.Limiter,
Tracer: r.Tracer,
Logger: r.Logger,
}
var out bytes.Buffer
func() {
timeout := r.SubscribeResolverTimeout
if timeout == 0 {
timeout = time.Second
}
subCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// resolve response
func() {
defer subR.handlePanic(subCtx)
var buf bytes.Buffer
subR.execSelectionSet(subCtx, f.sels, f.field.Type, &pathSegment{nil, f.field.Alias}, s, resp, &buf)
propagateChildError := false
if _, nonNullChild := f.field.Type.(*types.NonNull); nonNullChild && resolvedToNull(&buf) {
propagateChildError = true
}
if !propagateChildError {
out.WriteString(fmt.Sprintf(`{"%s":`, f.field.Alias))
out.Write(buf.Bytes())
out.WriteString(`}`)
}
}()
if err := subCtx.Err(); err != nil {
c <- &Response{Errors: []*errors.QueryError{errors.Errorf("%s", err)}}
return
}
// Send response within timeout
// TODO: maybe block until sent?
select {
case <-subCtx.Done():
case c <- &Response{Data: out.Bytes(), Errors: subR.Errs}:
}
}()
}
}
}()
return c
}
func sendAndReturnClosed(resp *Response) chan *Response {
c := make(chan *Response, 1)
c <- resp
close(c)
return c
}