package graphql

import (
	"context"
	"errors"

	qerrors "github.com/graph-gophers/graphql-go/errors"
	"github.com/graph-gophers/graphql-go/internal/common"
	"github.com/graph-gophers/graphql-go/internal/exec"
	"github.com/graph-gophers/graphql-go/internal/exec/resolvable"
	"github.com/graph-gophers/graphql-go/internal/exec/selected"
	"github.com/graph-gophers/graphql-go/internal/query"
	"github.com/graph-gophers/graphql-go/internal/validation"
	"github.com/graph-gophers/graphql-go/introspection"
)

// Subscribe returns a response channel for the given subscription with the schema's
// resolver. It returns an error if the schema was created without a resolver.
// If the context gets cancelled, the response channel will be closed and no
// further resolvers will be called. The context error will be returned as soon
// as possible (not immediately).
func (s *Schema) Subscribe(ctx context.Context, queryString string, operationName string, variables map[string]interface{}) (<-chan interface{}, error) {
	if !s.res.Resolver.IsValid() {
		return nil, errors.New("schema created without resolver, can not subscribe")
	}
	if _, ok := s.schema.EntryPoints["subscription"]; !ok {
		return nil, errors.New("no subscriptions are offered by the schema")
	}
	return s.subscribe(ctx, queryString, operationName, variables, s.res), nil
}

func (s *Schema) subscribe(ctx context.Context, queryString string, operationName string, variables map[string]interface{}, res *resolvable.Schema) <-chan interface{} {
	doc, qErr := query.Parse(queryString)
	if qErr != nil {
		return sendAndReturnClosed(&Response{Errors: []*qerrors.QueryError{qErr}})
	}

	validationFinish := s.validationTracer.TraceValidation(ctx)
	errs := validation.Validate(s.schema, doc, variables, s.maxDepth)
	validationFinish(errs)
	if len(errs) != 0 {
		return sendAndReturnClosed(&Response{Errors: errs})
	}

	op, err := getOperation(doc, operationName)
	if err != nil {
		return sendAndReturnClosed(&Response{Errors: []*qerrors.QueryError{qerrors.Errorf("%s", err)}})
	}

	r := &exec.Request{
		Request: selected.Request{
			Doc:    doc,
			Vars:   variables,
			Schema: s.schema,
		},
		Limiter:                  make(chan struct{}, s.maxParallelism),
		Tracer:                   s.tracer,
		Logger:                   s.logger,
		PanicHandler:             s.panicHandler,
		SubscribeResolverTimeout: s.subscribeResolverTimeout,
	}
	varTypes := make(map[string]*introspection.Type)
	for _, v := range op.Vars {
		t, err := common.ResolveType(v.Type, s.schema.Resolve)
		if err != nil {
			return sendAndReturnClosed(&Response{Errors: []*qerrors.QueryError{err}})
		}
		varTypes[v.Name.Name] = introspection.WrapType(t)
	}

	if op.Type == query.Query || op.Type == query.Mutation {
		data, errs := r.Execute(ctx, res, op)
		return sendAndReturnClosed(&Response{Data: data, Errors: errs})
	}

	responses := r.Subscribe(ctx, res, op)
	c := make(chan interface{})
	go func() {
		for resp := range responses {
			c <- &Response{
				Data:   resp.Data,
				Errors: resp.Errors,
			}
		}
		close(c)
	}()

	return c
}

func sendAndReturnClosed(resp *Response) chan interface{} {
	c := make(chan interface{}, 1)
	c <- resp
	close(c)
	return c
}