155 lines
3.2 KiB
Go
155 lines
3.2 KiB
Go
package transport
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
)
|
|
|
|
type Server struct {
|
|
ctx context.Context
|
|
cancel context.CancelCauseFunc
|
|
connectedClients ConnectedClients
|
|
incomingMessages chan IncomingMessage
|
|
}
|
|
|
|
var ErrAlreadyClosed = fmt.Errorf("server already closed")
|
|
|
|
func NewServer(address string) (*Server, error) {
|
|
ctx, cancel := context.WithCancelCause(context.Background())
|
|
listener, err := net.Listen("tcp", address)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
server := &Server{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
connectedClients: newConnectedClients(),
|
|
incomingMessages: make(chan IncomingMessage, 1024),
|
|
}
|
|
|
|
go (func() {
|
|
for {
|
|
connection, err := listener.Accept()
|
|
if err != nil {
|
|
cancel(err)
|
|
return
|
|
}
|
|
go server.handleConnection(connection)
|
|
}
|
|
})()
|
|
|
|
return server, nil
|
|
}
|
|
|
|
func (server *Server) Close() {
|
|
close(server.incomingMessages)
|
|
server.cancel(nil)
|
|
}
|
|
|
|
func (server *Server) handleConnection(conn net.Conn) {
|
|
defer conn.Close()
|
|
|
|
clientCtx, cancel := context.WithCancelCause(server.ctx)
|
|
outgoingMessages := make(chan OutgoingMessage, 1024)
|
|
|
|
clientId := server.connectedClients.Enroll(func(id ClientId) ConnectedClient {
|
|
return ConnectedClient{
|
|
cancel: cancel,
|
|
outgoingMessages: outgoingMessages,
|
|
}
|
|
})
|
|
defer server.connectedClients.Unenroll(clientId)
|
|
|
|
go (func() {
|
|
<-clientCtx.Done()
|
|
cause := context.Cause(clientCtx)
|
|
log.Printf("client %d done: %s", clientId, cause)
|
|
})()
|
|
|
|
ingoingLines := readLineByLine(conn)
|
|
outgoingLines := bufio.NewWriter(conn)
|
|
|
|
for {
|
|
select {
|
|
case item := <-ingoingLines:
|
|
line := item.Line
|
|
err := item.Error
|
|
|
|
if err != nil {
|
|
cancel(err)
|
|
continue
|
|
}
|
|
|
|
msg, err := Deserialize(line)
|
|
if err != nil {
|
|
cancel(err)
|
|
continue
|
|
}
|
|
if msg == nil {
|
|
continue
|
|
}
|
|
|
|
log.Printf("recv: %v", msg)
|
|
server.incomingMessages <- IncomingMessage{
|
|
Sender: clientId,
|
|
Content: *msg,
|
|
}
|
|
case outgoing := <-outgoingMessages:
|
|
log.Printf("sent: %v", outgoing.Content)
|
|
content, err := outgoing.Content.Serialize()
|
|
if err != nil {
|
|
cancel(err)
|
|
continue
|
|
}
|
|
log.Printf("content: %s", *content)
|
|
|
|
_, err = outgoingLines.WriteString(*content)
|
|
if err != nil {
|
|
cancel(err)
|
|
continue
|
|
}
|
|
|
|
// TODO: Don't flush on every iteration
|
|
err = outgoingLines.Flush()
|
|
if err != nil {
|
|
cancel(err)
|
|
continue
|
|
}
|
|
case <-clientCtx.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func (server *Server) ReceiveMessage() (IncomingMessage, error) {
|
|
message, ok := <-server.incomingMessages
|
|
if !ok {
|
|
return IncomingMessage{}, ErrAlreadyClosed
|
|
}
|
|
return message, nil
|
|
}
|
|
|
|
func (server *Server) SendMessage(client ClientId, content Content) {
|
|
outgoing := OutgoingMessage{
|
|
Recipient: client,
|
|
Content: content,
|
|
}
|
|
server.connectedClients.BorrowIfPresent(client, func(connectedClient *ConnectedClient) {
|
|
log.Printf("putting in outgoing")
|
|
connectedClient.outgoingMessages <- outgoing
|
|
log.Printf("done putting in outgoing")
|
|
})
|
|
}
|
|
|
|
func (server *Server) TerminateClient(client ClientId, err error) {
|
|
server.connectedClients.BorrowIfPresent(client, func(connectedClient *ConnectedClient) {
|
|
connectedClient.cancel(err)
|
|
})
|
|
}
|