mirror of
https://github.com/42wim/matterbridge.git
synced 2024-12-18 06:52:01 -08:00
634 lines
16 KiB
Go
634 lines
16 KiB
Go
package kbchat
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"os"
|
|
"os/exec"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/keybase/go-keybase-chat-bot/kbchat/types/chat1"
|
|
"github.com/keybase/go-keybase-chat-bot/kbchat/types/keybase1"
|
|
"github.com/keybase/go-keybase-chat-bot/kbchat/types/stellar1"
|
|
)
|
|
|
|
// SubscriptionMessage contains a message and conversation object
|
|
type SubscriptionMessage struct {
|
|
Message chat1.MsgSummary
|
|
Conversation chat1.ConvSummary
|
|
}
|
|
|
|
type SubscriptionConversation struct {
|
|
Conversation chat1.ConvSummary
|
|
}
|
|
|
|
type SubscriptionWalletEvent struct {
|
|
Payment stellar1.PaymentDetailsLocal
|
|
}
|
|
|
|
// Subscription has methods to control the background message fetcher loop
|
|
type Subscription struct {
|
|
*DebugOutput
|
|
sync.Mutex
|
|
|
|
newMsgsCh chan SubscriptionMessage
|
|
newConvsCh chan SubscriptionConversation
|
|
newWalletCh chan SubscriptionWalletEvent
|
|
errorCh chan error
|
|
running bool
|
|
shutdownCh chan struct{}
|
|
}
|
|
|
|
func NewSubscription() *Subscription {
|
|
newMsgsCh := make(chan SubscriptionMessage, 250)
|
|
newConvsCh := make(chan SubscriptionConversation, 250)
|
|
newWalletCh := make(chan SubscriptionWalletEvent, 250)
|
|
errorCh := make(chan error, 250)
|
|
shutdownCh := make(chan struct{})
|
|
return &Subscription{
|
|
DebugOutput: NewDebugOutput("Subscription"),
|
|
newMsgsCh: newMsgsCh,
|
|
newConvsCh: newConvsCh,
|
|
newWalletCh: newWalletCh,
|
|
shutdownCh: shutdownCh,
|
|
errorCh: errorCh,
|
|
running: true,
|
|
}
|
|
}
|
|
|
|
// Read blocks until a new message arrives
|
|
func (m *Subscription) Read() (msg SubscriptionMessage, err error) {
|
|
defer m.Trace(&err, "Read")()
|
|
select {
|
|
case msg = <-m.newMsgsCh:
|
|
return msg, nil
|
|
case err = <-m.errorCh:
|
|
return SubscriptionMessage{}, err
|
|
case <-m.shutdownCh:
|
|
return SubscriptionMessage{}, errors.New("Subscription shutdown")
|
|
}
|
|
}
|
|
|
|
func (m *Subscription) ReadNewConvs() (conv SubscriptionConversation, err error) {
|
|
defer m.Trace(&err, "ReadNewConvs")()
|
|
select {
|
|
case conv = <-m.newConvsCh:
|
|
return conv, nil
|
|
case err = <-m.errorCh:
|
|
return SubscriptionConversation{}, err
|
|
case <-m.shutdownCh:
|
|
return SubscriptionConversation{}, errors.New("Subscription shutdown")
|
|
}
|
|
}
|
|
|
|
// Read blocks until a new message arrives
|
|
func (m *Subscription) ReadWallet() (msg SubscriptionWalletEvent, err error) {
|
|
defer m.Trace(&err, "ReadWallet")()
|
|
select {
|
|
case msg = <-m.newWalletCh:
|
|
return msg, nil
|
|
case err = <-m.errorCh:
|
|
return SubscriptionWalletEvent{}, err
|
|
case <-m.shutdownCh:
|
|
return SubscriptionWalletEvent{}, errors.New("Subscription shutdown")
|
|
}
|
|
}
|
|
|
|
// Shutdown terminates the background process
|
|
func (m *Subscription) Shutdown() {
|
|
defer m.Trace(nil, "Shutdown")()
|
|
m.Lock()
|
|
defer m.Unlock()
|
|
if m.running {
|
|
close(m.shutdownCh)
|
|
m.running = false
|
|
}
|
|
}
|
|
|
|
type ListenOptions struct {
|
|
Wallet bool
|
|
Convs bool
|
|
}
|
|
|
|
type PaymentHolder struct {
|
|
Payment stellar1.PaymentDetailsLocal `json:"notification"`
|
|
}
|
|
|
|
type TypeHolder struct {
|
|
Type string `json:"type"`
|
|
}
|
|
|
|
type OneshotOptions struct {
|
|
Username string
|
|
PaperKey string
|
|
}
|
|
|
|
type RunOptions struct {
|
|
KeybaseLocation string
|
|
HomeDir string
|
|
Oneshot *OneshotOptions
|
|
StartService bool
|
|
// Have the bot send/receive typing notifications
|
|
EnableTyping bool
|
|
// Disable bot lite mode
|
|
DisableBotLiteMode bool
|
|
// Number of processes to spin up to connect to the keybase service
|
|
NumPipes int
|
|
}
|
|
|
|
func (r RunOptions) Location() string {
|
|
if r.KeybaseLocation == "" {
|
|
return "keybase"
|
|
}
|
|
return r.KeybaseLocation
|
|
}
|
|
|
|
func (r RunOptions) Command(args ...string) *exec.Cmd {
|
|
var cmd []string
|
|
if r.HomeDir != "" {
|
|
cmd = append(cmd, "--home", r.HomeDir)
|
|
}
|
|
cmd = append(cmd, args...)
|
|
return exec.Command(r.Location(), cmd...)
|
|
}
|
|
|
|
// Start fires up the Keybase JSON API in stdin/stdout mode
|
|
func Start(runOpts RunOptions, opts ...func(*API)) (*API, error) {
|
|
api := NewAPI(runOpts, opts...)
|
|
if err := api.startPipes(); err != nil {
|
|
return nil, err
|
|
}
|
|
return api, nil
|
|
}
|
|
|
|
type apiPipe struct {
|
|
sync.Mutex
|
|
input io.Writer
|
|
output *bufio.Reader
|
|
cmd *exec.Cmd
|
|
}
|
|
|
|
// API is the main object used for communicating with the Keybase JSON API
|
|
type API struct {
|
|
sync.Mutex
|
|
*DebugOutput
|
|
// Round robin hand out API pipes to allow concurrent API requests.
|
|
pipeIdx int
|
|
pipes []*apiPipe
|
|
username string
|
|
runOpts RunOptions
|
|
subscriptions []*Subscription
|
|
Timeout time.Duration
|
|
LogSendBytes int
|
|
}
|
|
|
|
func CustomTimeout(timeout time.Duration) func(*API) {
|
|
return func(a *API) {
|
|
a.Timeout = timeout
|
|
}
|
|
}
|
|
|
|
func NewAPI(runOpts RunOptions, opts ...func(*API)) *API {
|
|
api := &API{
|
|
DebugOutput: NewDebugOutput("API"),
|
|
runOpts: runOpts,
|
|
Timeout: 5 * time.Second,
|
|
LogSendBytes: 1024 * 1024 * 5, // request 5MB so we don't get killed
|
|
}
|
|
for _, opt := range opts {
|
|
opt(api)
|
|
}
|
|
return api
|
|
}
|
|
|
|
func (a *API) Command(args ...string) *exec.Cmd {
|
|
return a.runOpts.Command(args...)
|
|
}
|
|
|
|
func (a *API) getUsername(runOpts RunOptions) (username string, err error) {
|
|
p := runOpts.Command("whoami", "-json")
|
|
output, err := p.StdoutPipe()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if runtime.GOOS != "windows" {
|
|
p.ExtraFiles = []*os.File{output.(*os.File)}
|
|
}
|
|
if err = p.Start(); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
doneCh := make(chan error)
|
|
go func() {
|
|
defer func() { close(doneCh) }()
|
|
statusJSON, err := io.ReadAll(output)
|
|
if err != nil {
|
|
doneCh <- fmt.Errorf("error reading whoami output: %v", err)
|
|
return
|
|
}
|
|
var status keybase1.CurrentStatus
|
|
if err := json.Unmarshal(statusJSON, &status); err != nil {
|
|
doneCh <- fmt.Errorf("invalid whoami JSON %q: %v", statusJSON, err)
|
|
return
|
|
}
|
|
if status.LoggedIn && status.User != nil {
|
|
username = status.User.Username
|
|
doneCh <- nil
|
|
} else {
|
|
doneCh <- fmt.Errorf("unable to authenticate to keybase service: logged in: %v user: %+v", status.LoggedIn, status.User)
|
|
}
|
|
// Cleanup the command
|
|
if err := p.Wait(); err != nil {
|
|
a.Debug("unable to wait for cmd: %v", err)
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case err = <-doneCh:
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
case <-time.After(a.Timeout):
|
|
return "", errors.New("unable to run Keybase command")
|
|
}
|
|
|
|
return username, nil
|
|
}
|
|
|
|
func (a *API) auth() (string, error) {
|
|
username, err := a.getUsername(a.runOpts)
|
|
if err == nil {
|
|
return username, nil
|
|
}
|
|
if a.runOpts.Oneshot == nil {
|
|
return "", err
|
|
}
|
|
username = ""
|
|
// If a paper key is specified, then login with oneshot mode (logout first)
|
|
if a.runOpts.Oneshot != nil {
|
|
if username == a.runOpts.Oneshot.Username {
|
|
// just get out if we are on the desired user already
|
|
return username, nil
|
|
}
|
|
if err := a.runOpts.Command("logout", "-f").Run(); err != nil {
|
|
return "", err
|
|
}
|
|
if err := a.runOpts.Command("oneshot", "--username", a.runOpts.Oneshot.Username, "--paperkey",
|
|
a.runOpts.Oneshot.PaperKey).Run(); err != nil {
|
|
return "", err
|
|
}
|
|
username = a.runOpts.Oneshot.Username
|
|
return username, nil
|
|
}
|
|
return "", errors.New("unable to auth")
|
|
}
|
|
|
|
func (a *API) startPipes() (err error) {
|
|
a.Lock()
|
|
defer a.Unlock()
|
|
for _, pipe := range a.pipes {
|
|
if pipe.cmd != nil {
|
|
if err := pipe.cmd.Process.Kill(); err != nil {
|
|
return fmt.Errorf("unable to kill previous API command %v", err)
|
|
}
|
|
}
|
|
pipe.cmd = nil
|
|
}
|
|
a.pipes = nil
|
|
|
|
if a.runOpts.StartService {
|
|
args := []string{fmt.Sprintf("-enable-bot-lite-mode=%v", a.runOpts.DisableBotLiteMode), "service"}
|
|
if err := a.runOpts.Command(args...).Start(); err != nil {
|
|
return fmt.Errorf("unable to start service %v", err)
|
|
}
|
|
}
|
|
|
|
if a.username, err = a.auth(); err != nil {
|
|
return fmt.Errorf("unable to auth: %v", err)
|
|
}
|
|
|
|
cmd := a.runOpts.Command("chat", "notification-settings", fmt.Sprintf("-disable-typing=%v", !a.runOpts.EnableTyping))
|
|
if err = cmd.Run(); err != nil {
|
|
// This is a performance optimization but isn't a fatal error.
|
|
a.Debug("unable to set notifiation settings %v", err)
|
|
}
|
|
|
|
// Startup NumPipes processes to the keybase chat api
|
|
for i := 0; i < int(math.Max(float64(a.runOpts.NumPipes), 1)); i++ {
|
|
pipe := apiPipe{}
|
|
pipe.cmd = a.runOpts.Command("chat", "api")
|
|
if pipe.input, err = pipe.cmd.StdinPipe(); err != nil {
|
|
return fmt.Errorf("unable to get api stdin: %v", err)
|
|
}
|
|
output, err := pipe.cmd.StdoutPipe()
|
|
if err != nil {
|
|
return fmt.Errorf("unable to get api stdout: %v", err)
|
|
}
|
|
if runtime.GOOS != "windows" {
|
|
pipe.cmd.ExtraFiles = []*os.File{output.(*os.File)}
|
|
}
|
|
if err := pipe.cmd.Start(); err != nil {
|
|
return fmt.Errorf("unable to run chat api cmd: %v", err)
|
|
}
|
|
pipe.output = bufio.NewReader(output)
|
|
a.pipes = append(a.pipes, &pipe)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *API) getAPIPipes() (*apiPipe, error) {
|
|
a.Lock()
|
|
defer a.Unlock()
|
|
idx := a.pipeIdx % len(a.pipes)
|
|
a.pipeIdx++
|
|
pipe := a.pipes[idx]
|
|
if pipe.cmd == nil {
|
|
return nil, errAPIDisconnected
|
|
}
|
|
return pipe, nil
|
|
}
|
|
|
|
func (a *API) GetUsername() string {
|
|
return a.username
|
|
}
|
|
|
|
func (a *API) doSend(arg interface{}) (resp SendResponse, err error) {
|
|
bArg, err := json.Marshal(arg)
|
|
if err != nil {
|
|
return SendResponse{}, fmt.Errorf("unable to send arg: %+v: %v", arg, err)
|
|
}
|
|
pipe, err := a.getAPIPipes()
|
|
if err != nil {
|
|
return SendResponse{}, err
|
|
}
|
|
pipe.Lock()
|
|
defer pipe.Unlock()
|
|
|
|
if _, err := io.WriteString(pipe.input, string(bArg)); err != nil {
|
|
return SendResponse{}, err
|
|
}
|
|
responseRaw, err := pipe.output.ReadBytes('\n')
|
|
if err != nil {
|
|
return SendResponse{}, err
|
|
}
|
|
if err := json.Unmarshal(responseRaw, &resp); err != nil {
|
|
return resp, fmt.Errorf("failed to decode API response: %v %v", responseRaw, err)
|
|
} else if resp.Error != nil {
|
|
return resp, errors.New(resp.Error.Message)
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (a *API) doFetch(apiInput string) ([]byte, error) {
|
|
pipe, err := a.getAPIPipes()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pipe.Lock()
|
|
defer pipe.Unlock()
|
|
|
|
if _, err := io.WriteString(pipe.input, apiInput); err != nil {
|
|
return nil, err
|
|
}
|
|
byteOutput, err := pipe.output.ReadBytes('\n')
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return byteOutput, nil
|
|
}
|
|
|
|
// ListenForNewTextMessages proxies to Listen without wallet events
|
|
func (a *API) ListenForNewTextMessages() (*Subscription, error) {
|
|
opts := ListenOptions{Wallet: false}
|
|
return a.Listen(opts)
|
|
}
|
|
|
|
func (a *API) registerSubscription(sub *Subscription) {
|
|
a.Lock()
|
|
defer a.Unlock()
|
|
a.subscriptions = append(a.subscriptions, sub)
|
|
}
|
|
|
|
// Listen fires of a background loop and puts chat messages and wallet
|
|
// events into channels
|
|
func (a *API) Listen(opts ListenOptions) (*Subscription, error) {
|
|
done := make(chan struct{})
|
|
sub := NewSubscription()
|
|
a.registerSubscription(sub)
|
|
pause := 2 * time.Second
|
|
readScanner := func(boutput *bufio.Scanner) {
|
|
defer func() { done <- struct{}{} }()
|
|
for {
|
|
select {
|
|
case <-sub.shutdownCh:
|
|
a.Debug("readScanner: received shutdown")
|
|
return
|
|
default:
|
|
}
|
|
boutput.Scan()
|
|
t := boutput.Text()
|
|
submitErr := func(err error) {
|
|
if len(sub.errorCh)*2 > cap(sub.errorCh) {
|
|
a.Debug("large errorCh queue: len: %d cap: %d ", len(sub.errorCh), cap(sub.errorCh))
|
|
}
|
|
sub.errorCh <- err
|
|
}
|
|
var typeHolder TypeHolder
|
|
if err := json.Unmarshal([]byte(t), &typeHolder); err != nil {
|
|
submitErr(fmt.Errorf("err: %v, data: %v", err, t))
|
|
break
|
|
}
|
|
switch typeHolder.Type {
|
|
case "chat":
|
|
var notification chat1.MsgNotification
|
|
if err := json.Unmarshal([]byte(t), ¬ification); err != nil {
|
|
submitErr(fmt.Errorf("err: %v, data: %v", err, t))
|
|
break
|
|
}
|
|
if notification.Error != nil {
|
|
a.Debug("error message received: %s", *notification.Error)
|
|
} else if notification.Msg != nil {
|
|
subscriptionMessage := SubscriptionMessage{
|
|
Message: *notification.Msg,
|
|
Conversation: chat1.ConvSummary{
|
|
Id: notification.Msg.ConvID,
|
|
Channel: notification.Msg.Channel,
|
|
},
|
|
}
|
|
if len(sub.newMsgsCh)*2 > cap(sub.newMsgsCh) {
|
|
a.Debug("large newMsgsCh queue: len: %d cap: %d ", len(sub.newMsgsCh), cap(sub.newMsgsCh))
|
|
}
|
|
sub.newMsgsCh <- subscriptionMessage
|
|
}
|
|
case "chat_conv":
|
|
var notification chat1.ConvNotification
|
|
if err := json.Unmarshal([]byte(t), ¬ification); err != nil {
|
|
submitErr(fmt.Errorf("err: %v, data: %v", err, t))
|
|
break
|
|
}
|
|
if notification.Error != nil {
|
|
a.Debug("error message received: %s", *notification.Error)
|
|
} else if notification.Conv != nil {
|
|
subscriptionConv := SubscriptionConversation{
|
|
Conversation: *notification.Conv,
|
|
}
|
|
if len(sub.newConvsCh)*2 > cap(sub.newConvsCh) {
|
|
a.Debug("large newConvsCh queue: len: %d cap: %d ", len(sub.newConvsCh), cap(sub.newConvsCh))
|
|
}
|
|
sub.newConvsCh <- subscriptionConv
|
|
}
|
|
case "wallet":
|
|
var holder PaymentHolder
|
|
if err := json.Unmarshal([]byte(t), &holder); err != nil {
|
|
submitErr(fmt.Errorf("err: %v, data: %v", err, t))
|
|
break
|
|
}
|
|
subscriptionPayment := SubscriptionWalletEvent(holder)
|
|
if len(sub.newWalletCh)*2 > cap(sub.newWalletCh) {
|
|
a.Debug("large newWalletCh queue: len: %d cap: %d ", len(sub.newWalletCh), cap(sub.newWalletCh))
|
|
}
|
|
sub.newWalletCh <- subscriptionPayment
|
|
default:
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
attempts := 0
|
|
maxAttempts := 30
|
|
go func() {
|
|
defer func() {
|
|
close(sub.newMsgsCh)
|
|
close(sub.newConvsCh)
|
|
close(sub.newWalletCh)
|
|
close(sub.errorCh)
|
|
}()
|
|
for {
|
|
select {
|
|
case <-sub.shutdownCh:
|
|
a.Debug("Listen: received shutdown")
|
|
return
|
|
default:
|
|
}
|
|
|
|
if attempts >= maxAttempts {
|
|
if err := a.LogSend("Listen: failed to auth, giving up"); err != nil {
|
|
a.Debug("Listen: logsend failed to send: %v", err)
|
|
}
|
|
panic("Listen: failed to auth, giving up")
|
|
}
|
|
attempts++
|
|
if _, err := a.auth(); err != nil {
|
|
a.Debug("Listen: failed to auth: %s", err)
|
|
time.Sleep(pause)
|
|
continue
|
|
}
|
|
cmdElements := []string{"chat", "api-listen"}
|
|
if opts.Wallet {
|
|
cmdElements = append(cmdElements, "--wallet")
|
|
}
|
|
if opts.Convs {
|
|
cmdElements = append(cmdElements, "--convs")
|
|
}
|
|
p := a.runOpts.Command(cmdElements...)
|
|
output, err := p.StdoutPipe()
|
|
if err != nil {
|
|
a.Debug("Listen: failed to listen: %s", err)
|
|
time.Sleep(pause)
|
|
continue
|
|
}
|
|
stderr, err := p.StderrPipe()
|
|
if err != nil {
|
|
a.Debug("Listen: failed to listen to stderr: %s", err)
|
|
time.Sleep(pause)
|
|
continue
|
|
}
|
|
if runtime.GOOS != "windows" {
|
|
p.ExtraFiles = []*os.File{stderr.(*os.File), output.(*os.File)}
|
|
}
|
|
boutput := bufio.NewScanner(output)
|
|
if err := p.Start(); err != nil {
|
|
a.Debug("Listen: failed to make listen scanner: %s", err)
|
|
time.Sleep(pause)
|
|
continue
|
|
}
|
|
attempts = 0
|
|
go readScanner(boutput)
|
|
select {
|
|
case <-sub.shutdownCh:
|
|
a.Debug("Listen: received shutdown")
|
|
return
|
|
case <-done:
|
|
}
|
|
if err := p.Wait(); err != nil {
|
|
stderrBytes, rerr := io.ReadAll(stderr)
|
|
if rerr != nil {
|
|
stderrBytes = []byte(fmt.Sprintf("failed to get stderr: %v", rerr))
|
|
}
|
|
a.Debug("Listen: failed to Wait for command, restarting pipes: %s (```%s```)", err, stderrBytes)
|
|
if err := a.startPipes(); err != nil {
|
|
a.Debug("Listen: failed to restart pipes: %v", err)
|
|
}
|
|
}
|
|
time.Sleep(pause)
|
|
}
|
|
}()
|
|
return sub, nil
|
|
}
|
|
|
|
func (a *API) LogSend(feedback string) error {
|
|
feedback = "go-keybase-chat-bot log send\n" +
|
|
"username: " + a.GetUsername() + "\n" +
|
|
feedback
|
|
|
|
args := []string{
|
|
"log", "send",
|
|
"--no-confirm",
|
|
"--feedback", feedback,
|
|
"-n", fmt.Sprintf("%d", a.LogSendBytes),
|
|
}
|
|
return a.runOpts.Command(args...).Run()
|
|
}
|
|
|
|
func (a *API) Shutdown() (err error) {
|
|
defer a.Trace(&err, "Shutdown")()
|
|
a.Lock()
|
|
defer a.Unlock()
|
|
for _, sub := range a.subscriptions {
|
|
sub.Shutdown()
|
|
}
|
|
for _, pipe := range a.pipes {
|
|
if pipe.cmd != nil {
|
|
a.Debug("waiting for API command")
|
|
if err := pipe.cmd.Wait(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
if a.runOpts.Oneshot != nil {
|
|
a.Debug("logging out")
|
|
err := a.runOpts.Command("logout", "--force").Run()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if a.runOpts.StartService {
|
|
a.Debug("stopping service")
|
|
err := a.runOpts.Command("ctl", "stop", "--shutdown").Run()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|