mirror of
https://github.com/42wim/matterbridge.git
synced 2024-11-24 11:42:03 -08:00
384 lines
8.9 KiB
Go
384 lines
8.9 KiB
Go
|
package steam
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"compress/gzip"
|
||
|
"crypto/rand"
|
||
|
"encoding/binary"
|
||
|
"fmt"
|
||
|
"hash/crc32"
|
||
|
"io/ioutil"
|
||
|
"net"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
|
||
|
"github.com/Philipp15b/go-steam/cryptoutil"
|
||
|
"github.com/Philipp15b/go-steam/netutil"
|
||
|
. "github.com/Philipp15b/go-steam/protocol"
|
||
|
. "github.com/Philipp15b/go-steam/protocol/protobuf"
|
||
|
. "github.com/Philipp15b/go-steam/protocol/steamlang"
|
||
|
. "github.com/Philipp15b/go-steam/steamid"
|
||
|
)
|
||
|
|
||
|
// Represents a client to the Steam network.
|
||
|
// Always poll events from the channel returned by Events() or receiving messages will stop.
|
||
|
// All access, unless otherwise noted, should be threadsafe.
|
||
|
//
|
||
|
// When a FatalErrorEvent is emitted, the connection is automatically closed. The same client can be used to reconnect.
|
||
|
// Other errors don't have any effect.
|
||
|
type Client struct {
|
||
|
// these need to be 64 bit aligned for sync/atomic on 32bit
|
||
|
sessionId int32
|
||
|
_ uint32
|
||
|
steamId uint64
|
||
|
currentJobId uint64
|
||
|
|
||
|
Auth *Auth
|
||
|
Social *Social
|
||
|
Web *Web
|
||
|
Notifications *Notifications
|
||
|
Trading *Trading
|
||
|
GC *GameCoordinator
|
||
|
|
||
|
events chan interface{}
|
||
|
handlers []PacketHandler
|
||
|
handlersMutex sync.RWMutex
|
||
|
|
||
|
tempSessionKey []byte
|
||
|
|
||
|
ConnectionTimeout time.Duration
|
||
|
|
||
|
mutex sync.RWMutex // guarding conn and writeChan
|
||
|
conn connection
|
||
|
writeChan chan IMsg
|
||
|
writeBuf *bytes.Buffer
|
||
|
heartbeat *time.Ticker
|
||
|
}
|
||
|
|
||
|
type PacketHandler interface {
|
||
|
HandlePacket(*Packet)
|
||
|
}
|
||
|
|
||
|
func NewClient() *Client {
|
||
|
client := &Client{
|
||
|
events: make(chan interface{}, 3),
|
||
|
writeBuf: new(bytes.Buffer),
|
||
|
}
|
||
|
client.Auth = &Auth{client: client}
|
||
|
client.RegisterPacketHandler(client.Auth)
|
||
|
client.Social = newSocial(client)
|
||
|
client.RegisterPacketHandler(client.Social)
|
||
|
client.Web = &Web{client: client}
|
||
|
client.RegisterPacketHandler(client.Web)
|
||
|
client.Notifications = newNotifications(client)
|
||
|
client.RegisterPacketHandler(client.Notifications)
|
||
|
client.Trading = &Trading{client: client}
|
||
|
client.RegisterPacketHandler(client.Trading)
|
||
|
client.GC = newGC(client)
|
||
|
client.RegisterPacketHandler(client.GC)
|
||
|
return client
|
||
|
}
|
||
|
|
||
|
// Get the event channel. By convention all events are pointers, except for errors.
|
||
|
// It is never closed.
|
||
|
func (c *Client) Events() <-chan interface{} {
|
||
|
return c.events
|
||
|
}
|
||
|
|
||
|
func (c *Client) Emit(event interface{}) {
|
||
|
c.events <- event
|
||
|
}
|
||
|
|
||
|
// Emits a FatalErrorEvent formatted with fmt.Errorf and disconnects.
|
||
|
func (c *Client) Fatalf(format string, a ...interface{}) {
|
||
|
c.Emit(FatalErrorEvent(fmt.Errorf(format, a...)))
|
||
|
c.Disconnect()
|
||
|
}
|
||
|
|
||
|
// Emits an error formatted with fmt.Errorf.
|
||
|
func (c *Client) Errorf(format string, a ...interface{}) {
|
||
|
c.Emit(fmt.Errorf(format, a...))
|
||
|
}
|
||
|
|
||
|
// Registers a PacketHandler that receives all incoming packets.
|
||
|
func (c *Client) RegisterPacketHandler(handler PacketHandler) {
|
||
|
c.handlersMutex.Lock()
|
||
|
defer c.handlersMutex.Unlock()
|
||
|
c.handlers = append(c.handlers, handler)
|
||
|
}
|
||
|
|
||
|
func (c *Client) GetNextJobId() JobId {
|
||
|
return JobId(atomic.AddUint64(&c.currentJobId, 1))
|
||
|
}
|
||
|
|
||
|
func (c *Client) SteamId() SteamId {
|
||
|
return SteamId(atomic.LoadUint64(&c.steamId))
|
||
|
}
|
||
|
|
||
|
func (c *Client) SessionId() int32 {
|
||
|
return atomic.LoadInt32(&c.sessionId)
|
||
|
}
|
||
|
|
||
|
func (c *Client) Connected() bool {
|
||
|
c.mutex.RLock()
|
||
|
defer c.mutex.RUnlock()
|
||
|
return c.conn != nil
|
||
|
}
|
||
|
|
||
|
// Connects to a random Steam server and returns its address.
|
||
|
// If this client is already connected, it is disconnected first.
|
||
|
// This method tries to use an address from the Steam Directory and falls
|
||
|
// back to the built-in server list if the Steam Directory can't be reached.
|
||
|
// If you want to connect to a specific server, use `ConnectTo`.
|
||
|
func (c *Client) Connect() *netutil.PortAddr {
|
||
|
var server *netutil.PortAddr
|
||
|
if steamDirectoryCache.IsInitialized() {
|
||
|
server = steamDirectoryCache.GetRandomCM()
|
||
|
} else {
|
||
|
server = GetRandomCM()
|
||
|
}
|
||
|
c.ConnectTo(server)
|
||
|
return server
|
||
|
}
|
||
|
|
||
|
// Connects to a specific server.
|
||
|
// You may want to use one of the `GetRandom*CM()` functions in this package.
|
||
|
// If this client is already connected, it is disconnected first.
|
||
|
func (c *Client) ConnectTo(addr *netutil.PortAddr) {
|
||
|
c.ConnectToBind(addr, nil)
|
||
|
}
|
||
|
|
||
|
// Connects to a specific server, and binds to a specified local IP
|
||
|
// If this client is already connected, it is disconnected first.
|
||
|
func (c *Client) ConnectToBind(addr *netutil.PortAddr, local *net.TCPAddr) {
|
||
|
c.Disconnect()
|
||
|
|
||
|
conn, err := dialTCP(local, addr.ToTCPAddr())
|
||
|
if err != nil {
|
||
|
c.Fatalf("Connect failed: %v", err)
|
||
|
return
|
||
|
}
|
||
|
c.conn = conn
|
||
|
c.writeChan = make(chan IMsg, 5)
|
||
|
|
||
|
go c.readLoop()
|
||
|
go c.writeLoop()
|
||
|
}
|
||
|
|
||
|
func (c *Client) Disconnect() {
|
||
|
c.mutex.Lock()
|
||
|
defer c.mutex.Unlock()
|
||
|
|
||
|
if c.conn == nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
c.conn.Close()
|
||
|
c.conn = nil
|
||
|
if c.heartbeat != nil {
|
||
|
c.heartbeat.Stop()
|
||
|
}
|
||
|
close(c.writeChan)
|
||
|
c.Emit(&DisconnectedEvent{})
|
||
|
|
||
|
}
|
||
|
|
||
|
// Adds a message to the send queue. Modifications to the given message after
|
||
|
// writing are not allowed (possible race conditions).
|
||
|
//
|
||
|
// Writes to this client when not connected are ignored.
|
||
|
func (c *Client) Write(msg IMsg) {
|
||
|
if cm, ok := msg.(IClientMsg); ok {
|
||
|
cm.SetSessionId(c.SessionId())
|
||
|
cm.SetSteamId(c.SteamId())
|
||
|
}
|
||
|
c.mutex.RLock()
|
||
|
defer c.mutex.RUnlock()
|
||
|
if c.conn == nil {
|
||
|
return
|
||
|
}
|
||
|
c.writeChan <- msg
|
||
|
}
|
||
|
|
||
|
func (c *Client) readLoop() {
|
||
|
for {
|
||
|
// This *should* be atomic on most platforms, but the Go spec doesn't guarantee it
|
||
|
c.mutex.RLock()
|
||
|
conn := c.conn
|
||
|
c.mutex.RUnlock()
|
||
|
if conn == nil {
|
||
|
return
|
||
|
}
|
||
|
packet, err := conn.Read()
|
||
|
|
||
|
if err != nil {
|
||
|
c.Fatalf("Error reading from the connection: %v", err)
|
||
|
return
|
||
|
}
|
||
|
c.handlePacket(packet)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *Client) writeLoop() {
|
||
|
for {
|
||
|
c.mutex.RLock()
|
||
|
conn := c.conn
|
||
|
c.mutex.RUnlock()
|
||
|
if conn == nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
msg, ok := <-c.writeChan
|
||
|
if !ok {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
err := msg.Serialize(c.writeBuf)
|
||
|
if err != nil {
|
||
|
c.writeBuf.Reset()
|
||
|
c.Fatalf("Error serializing message %v: %v", msg, err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
err = conn.Write(c.writeBuf.Bytes())
|
||
|
|
||
|
c.writeBuf.Reset()
|
||
|
|
||
|
if err != nil {
|
||
|
c.Fatalf("Error writing message %v: %v", msg, err)
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *Client) heartbeatLoop(seconds time.Duration) {
|
||
|
if c.heartbeat != nil {
|
||
|
c.heartbeat.Stop()
|
||
|
}
|
||
|
c.heartbeat = time.NewTicker(seconds * time.Second)
|
||
|
for {
|
||
|
_, ok := <-c.heartbeat.C
|
||
|
if !ok {
|
||
|
break
|
||
|
}
|
||
|
c.Write(NewClientMsgProtobuf(EMsg_ClientHeartBeat, new(CMsgClientHeartBeat)))
|
||
|
}
|
||
|
c.heartbeat = nil
|
||
|
}
|
||
|
|
||
|
func (c *Client) handlePacket(packet *Packet) {
|
||
|
switch packet.EMsg {
|
||
|
case EMsg_ChannelEncryptRequest:
|
||
|
c.handleChannelEncryptRequest(packet)
|
||
|
case EMsg_ChannelEncryptResult:
|
||
|
c.handleChannelEncryptResult(packet)
|
||
|
case EMsg_Multi:
|
||
|
c.handleMulti(packet)
|
||
|
case EMsg_ClientCMList:
|
||
|
c.handleClientCMList(packet)
|
||
|
}
|
||
|
|
||
|
c.handlersMutex.RLock()
|
||
|
defer c.handlersMutex.RUnlock()
|
||
|
for _, handler := range c.handlers {
|
||
|
handler.HandlePacket(packet)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *Client) handleChannelEncryptRequest(packet *Packet) {
|
||
|
body := NewMsgChannelEncryptRequest()
|
||
|
packet.ReadMsg(body)
|
||
|
|
||
|
if body.Universe != EUniverse_Public {
|
||
|
c.Fatalf("Invalid univserse %v!", body.Universe)
|
||
|
}
|
||
|
|
||
|
c.tempSessionKey = make([]byte, 32)
|
||
|
rand.Read(c.tempSessionKey)
|
||
|
encryptedKey := cryptoutil.RSAEncrypt(GetPublicKey(EUniverse_Public), c.tempSessionKey)
|
||
|
|
||
|
payload := new(bytes.Buffer)
|
||
|
payload.Write(encryptedKey)
|
||
|
binary.Write(payload, binary.LittleEndian, crc32.ChecksumIEEE(encryptedKey))
|
||
|
payload.WriteByte(0)
|
||
|
payload.WriteByte(0)
|
||
|
payload.WriteByte(0)
|
||
|
payload.WriteByte(0)
|
||
|
|
||
|
c.Write(NewMsg(NewMsgChannelEncryptResponse(), payload.Bytes()))
|
||
|
}
|
||
|
|
||
|
func (c *Client) handleChannelEncryptResult(packet *Packet) {
|
||
|
body := NewMsgChannelEncryptResult()
|
||
|
packet.ReadMsg(body)
|
||
|
|
||
|
if body.Result != EResult_OK {
|
||
|
c.Fatalf("Encryption failed: %v", body.Result)
|
||
|
return
|
||
|
}
|
||
|
c.conn.SetEncryptionKey(c.tempSessionKey)
|
||
|
c.tempSessionKey = nil
|
||
|
|
||
|
c.Emit(&ConnectedEvent{})
|
||
|
}
|
||
|
|
||
|
func (c *Client) handleMulti(packet *Packet) {
|
||
|
body := new(CMsgMulti)
|
||
|
packet.ReadProtoMsg(body)
|
||
|
|
||
|
payload := body.GetMessageBody()
|
||
|
|
||
|
if body.GetSizeUnzipped() > 0 {
|
||
|
r, err := gzip.NewReader(bytes.NewReader(payload))
|
||
|
if err != nil {
|
||
|
c.Errorf("handleMulti: Error while decompressing: %v", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
payload, err = ioutil.ReadAll(r)
|
||
|
if err != nil {
|
||
|
c.Errorf("handleMulti: Error while decompressing: %v", err)
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
pr := bytes.NewReader(payload)
|
||
|
for pr.Len() > 0 {
|
||
|
var length uint32
|
||
|
binary.Read(pr, binary.LittleEndian, &length)
|
||
|
packetData := make([]byte, length)
|
||
|
pr.Read(packetData)
|
||
|
p, err := NewPacket(packetData)
|
||
|
if err != nil {
|
||
|
c.Errorf("Error reading packet in Multi msg %v: %v", packet, err)
|
||
|
continue
|
||
|
}
|
||
|
c.handlePacket(p)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *Client) handleClientCMList(packet *Packet) {
|
||
|
body := new(CMsgClientCMList)
|
||
|
packet.ReadProtoMsg(body)
|
||
|
|
||
|
l := make([]*netutil.PortAddr, 0)
|
||
|
for i, ip := range body.GetCmAddresses() {
|
||
|
l = append(l, &netutil.PortAddr{
|
||
|
readIp(ip),
|
||
|
uint16(body.GetCmPorts()[i]),
|
||
|
})
|
||
|
}
|
||
|
|
||
|
c.Emit(&ClientCMListEvent{l})
|
||
|
}
|
||
|
|
||
|
func readIp(ip uint32) net.IP {
|
||
|
r := make(net.IP, 4)
|
||
|
r[3] = byte(ip)
|
||
|
r[2] = byte(ip >> 8)
|
||
|
r[1] = byte(ip >> 16)
|
||
|
r[0] = byte(ip >> 24)
|
||
|
return r
|
||
|
}
|