mirror of
https://github.com/42wim/matterbridge.git
synced 2025-01-25 16:09:02 -08:00
180 lines
4.3 KiB
Go
180 lines
4.3 KiB
Go
|
package exec
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"reflect"
|
||
|
"time"
|
||
|
|
||
|
"github.com/graph-gophers/graphql-go/errors"
|
||
|
"github.com/graph-gophers/graphql-go/internal/exec/resolvable"
|
||
|
"github.com/graph-gophers/graphql-go/internal/exec/selected"
|
||
|
"github.com/graph-gophers/graphql-go/types"
|
||
|
)
|
||
|
|
||
|
type Response struct {
|
||
|
Data json.RawMessage
|
||
|
Errors []*errors.QueryError
|
||
|
}
|
||
|
|
||
|
func (r *Request) Subscribe(ctx context.Context, s *resolvable.Schema, op *types.OperationDefinition) <-chan *Response {
|
||
|
var result reflect.Value
|
||
|
var f *fieldToExec
|
||
|
var err *errors.QueryError
|
||
|
func() {
|
||
|
defer r.handlePanic(ctx)
|
||
|
|
||
|
sels := selected.ApplyOperation(&r.Request, s, op)
|
||
|
var fields []*fieldToExec
|
||
|
collectFieldsToResolve(sels, s, s.Resolver, &fields, make(map[string]*fieldToExec))
|
||
|
|
||
|
// TODO: move this check into validation.Validate
|
||
|
if len(fields) != 1 {
|
||
|
err = errors.Errorf("%s", "can subscribe to at most one subscription at a time")
|
||
|
return
|
||
|
}
|
||
|
f = fields[0]
|
||
|
|
||
|
var in []reflect.Value
|
||
|
if f.field.HasContext {
|
||
|
in = append(in, reflect.ValueOf(ctx))
|
||
|
}
|
||
|
if f.field.ArgsPacker != nil {
|
||
|
in = append(in, f.field.PackedArgs)
|
||
|
}
|
||
|
callOut := f.resolver.Method(f.field.MethodIndex).Call(in)
|
||
|
result = callOut[0]
|
||
|
|
||
|
if f.field.HasError && !callOut[1].IsNil() {
|
||
|
switch resolverErr := callOut[1].Interface().(type) {
|
||
|
case *errors.QueryError:
|
||
|
err = resolverErr
|
||
|
case error:
|
||
|
err = errors.Errorf("%s", resolverErr)
|
||
|
err.ResolverError = resolverErr
|
||
|
default:
|
||
|
panic(fmt.Errorf("can only deal with *QueryError and error types, got %T", resolverErr))
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
// Handles the case where the locally executed func above panicked
|
||
|
if len(r.Request.Errs) > 0 {
|
||
|
return sendAndReturnClosed(&Response{Errors: r.Request.Errs})
|
||
|
}
|
||
|
|
||
|
if f == nil {
|
||
|
return sendAndReturnClosed(&Response{Errors: []*errors.QueryError{err}})
|
||
|
}
|
||
|
|
||
|
if err != nil {
|
||
|
if _, nonNullChild := f.field.Type.(*types.NonNull); nonNullChild {
|
||
|
return sendAndReturnClosed(&Response{Errors: []*errors.QueryError{err}})
|
||
|
}
|
||
|
return sendAndReturnClosed(&Response{Data: []byte(fmt.Sprintf(`{"%s":null}`, f.field.Alias)), Errors: []*errors.QueryError{err}})
|
||
|
}
|
||
|
|
||
|
if ctxErr := ctx.Err(); ctxErr != nil {
|
||
|
return sendAndReturnClosed(&Response{Errors: []*errors.QueryError{errors.Errorf("%s", ctxErr)}})
|
||
|
}
|
||
|
|
||
|
c := make(chan *Response)
|
||
|
// TODO: handle resolver nil channel better?
|
||
|
if result.IsZero() {
|
||
|
close(c)
|
||
|
return c
|
||
|
}
|
||
|
|
||
|
go func() {
|
||
|
for {
|
||
|
// Check subscription context
|
||
|
chosen, resp, ok := reflect.Select([]reflect.SelectCase{
|
||
|
{
|
||
|
Dir: reflect.SelectRecv,
|
||
|
Chan: reflect.ValueOf(ctx.Done()),
|
||
|
},
|
||
|
{
|
||
|
Dir: reflect.SelectRecv,
|
||
|
Chan: result,
|
||
|
},
|
||
|
})
|
||
|
switch chosen {
|
||
|
// subscription context done
|
||
|
case 0:
|
||
|
close(c)
|
||
|
return
|
||
|
// upstream received
|
||
|
case 1:
|
||
|
// upstream closed
|
||
|
if !ok {
|
||
|
close(c)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
subR := &Request{
|
||
|
Request: selected.Request{
|
||
|
Doc: r.Request.Doc,
|
||
|
Vars: r.Request.Vars,
|
||
|
Schema: r.Request.Schema,
|
||
|
},
|
||
|
Limiter: r.Limiter,
|
||
|
Tracer: r.Tracer,
|
||
|
Logger: r.Logger,
|
||
|
}
|
||
|
var out bytes.Buffer
|
||
|
func() {
|
||
|
timeout := r.SubscribeResolverTimeout
|
||
|
if timeout == 0 {
|
||
|
timeout = time.Second
|
||
|
}
|
||
|
|
||
|
subCtx, cancel := context.WithTimeout(ctx, timeout)
|
||
|
defer cancel()
|
||
|
|
||
|
// resolve response
|
||
|
func() {
|
||
|
defer subR.handlePanic(subCtx)
|
||
|
|
||
|
var buf bytes.Buffer
|
||
|
subR.execSelectionSet(subCtx, f.sels, f.field.Type, &pathSegment{nil, f.field.Alias}, s, resp, &buf)
|
||
|
|
||
|
propagateChildError := false
|
||
|
if _, nonNullChild := f.field.Type.(*types.NonNull); nonNullChild && resolvedToNull(&buf) {
|
||
|
propagateChildError = true
|
||
|
}
|
||
|
|
||
|
if !propagateChildError {
|
||
|
out.WriteString(fmt.Sprintf(`{"%s":`, f.field.Alias))
|
||
|
out.Write(buf.Bytes())
|
||
|
out.WriteString(`}`)
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
if err := subCtx.Err(); err != nil {
|
||
|
c <- &Response{Errors: []*errors.QueryError{errors.Errorf("%s", err)}}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Send response within timeout
|
||
|
// TODO: maybe block until sent?
|
||
|
select {
|
||
|
case <-subCtx.Done():
|
||
|
case c <- &Response{Data: out.Bytes(), Errors: subR.Errs}:
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
return c
|
||
|
}
|
||
|
|
||
|
func sendAndReturnClosed(resp *Response) chan *Response {
|
||
|
c := make(chan *Response, 1)
|
||
|
c <- resp
|
||
|
close(c)
|
||
|
return c
|
||
|
}
|