mirror of
https://github.com/42wim/matterbridge.git
synced 2025-01-06 23:49:04 -08:00
97 lines
3.1 KiB
Go
97 lines
3.1 KiB
Go
|
package graphql
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
|
||
|
qerrors "github.com/graph-gophers/graphql-go/errors"
|
||
|
"github.com/graph-gophers/graphql-go/internal/common"
|
||
|
"github.com/graph-gophers/graphql-go/internal/exec"
|
||
|
"github.com/graph-gophers/graphql-go/internal/exec/resolvable"
|
||
|
"github.com/graph-gophers/graphql-go/internal/exec/selected"
|
||
|
"github.com/graph-gophers/graphql-go/internal/query"
|
||
|
"github.com/graph-gophers/graphql-go/internal/validation"
|
||
|
"github.com/graph-gophers/graphql-go/introspection"
|
||
|
)
|
||
|
|
||
|
// Subscribe returns a response channel for the given subscription with the schema's
|
||
|
// resolver. It returns an error if the schema was created without a resolver.
|
||
|
// If the context gets cancelled, the response channel will be closed and no
|
||
|
// further resolvers will be called. The context error will be returned as soon
|
||
|
// as possible (not immediately).
|
||
|
func (s *Schema) Subscribe(ctx context.Context, queryString string, operationName string, variables map[string]interface{}) (<-chan interface{}, error) {
|
||
|
if !s.res.Resolver.IsValid() {
|
||
|
return nil, errors.New("schema created without resolver, can not subscribe")
|
||
|
}
|
||
|
if _, ok := s.schema.EntryPoints["subscription"]; !ok {
|
||
|
return nil, errors.New("no subscriptions are offered by the schema")
|
||
|
}
|
||
|
return s.subscribe(ctx, queryString, operationName, variables, s.res), nil
|
||
|
}
|
||
|
|
||
|
func (s *Schema) subscribe(ctx context.Context, queryString string, operationName string, variables map[string]interface{}, res *resolvable.Schema) <-chan interface{} {
|
||
|
doc, qErr := query.Parse(queryString)
|
||
|
if qErr != nil {
|
||
|
return sendAndReturnClosed(&Response{Errors: []*qerrors.QueryError{qErr}})
|
||
|
}
|
||
|
|
||
|
validationFinish := s.validationTracer.TraceValidation(ctx)
|
||
|
errs := validation.Validate(s.schema, doc, variables, s.maxDepth)
|
||
|
validationFinish(errs)
|
||
|
if len(errs) != 0 {
|
||
|
return sendAndReturnClosed(&Response{Errors: errs})
|
||
|
}
|
||
|
|
||
|
op, err := getOperation(doc, operationName)
|
||
|
if err != nil {
|
||
|
return sendAndReturnClosed(&Response{Errors: []*qerrors.QueryError{qerrors.Errorf("%s", err)}})
|
||
|
}
|
||
|
|
||
|
r := &exec.Request{
|
||
|
Request: selected.Request{
|
||
|
Doc: doc,
|
||
|
Vars: variables,
|
||
|
Schema: s.schema,
|
||
|
},
|
||
|
Limiter: make(chan struct{}, s.maxParallelism),
|
||
|
Tracer: s.tracer,
|
||
|
Logger: s.logger,
|
||
|
PanicHandler: s.panicHandler,
|
||
|
SubscribeResolverTimeout: s.subscribeResolverTimeout,
|
||
|
}
|
||
|
varTypes := make(map[string]*introspection.Type)
|
||
|
for _, v := range op.Vars {
|
||
|
t, err := common.ResolveType(v.Type, s.schema.Resolve)
|
||
|
if err != nil {
|
||
|
return sendAndReturnClosed(&Response{Errors: []*qerrors.QueryError{err}})
|
||
|
}
|
||
|
varTypes[v.Name.Name] = introspection.WrapType(t)
|
||
|
}
|
||
|
|
||
|
if op.Type == query.Query || op.Type == query.Mutation {
|
||
|
data, errs := r.Execute(ctx, res, op)
|
||
|
return sendAndReturnClosed(&Response{Data: data, Errors: errs})
|
||
|
}
|
||
|
|
||
|
responses := r.Subscribe(ctx, res, op)
|
||
|
c := make(chan interface{})
|
||
|
go func() {
|
||
|
for resp := range responses {
|
||
|
c <- &Response{
|
||
|
Data: resp.Data,
|
||
|
Errors: resp.Errors,
|
||
|
}
|
||
|
}
|
||
|
close(c)
|
||
|
}()
|
||
|
|
||
|
return c
|
||
|
}
|
||
|
|
||
|
func sendAndReturnClosed(resp *Response) chan interface{} {
|
||
|
c := make(chan interface{}, 1)
|
||
|
c <- resp
|
||
|
close(c)
|
||
|
return c
|
||
|
}
|