Implementation 1 of a minimalistic IRC server
This commit is contained in:
152
src/transport/server.go
Normal file
152
src/transport/server.go
Normal file
@@ -0,0 +1,152 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelCauseFunc
|
||||
connectedClients ConnectedClients
|
||||
incomingMessages chan IncomingMessage
|
||||
}
|
||||
|
||||
var ErrAlreadyClosed = fmt.Errorf("server already closed")
|
||||
|
||||
func NewServer(address string) (*Server, error) {
|
||||
ctx, cancel := context.WithCancelCause(context.Background())
|
||||
listener, err := net.Listen("tcp", address)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
server := &Server{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
connectedClients: newConnectedClients(),
|
||||
incomingMessages: make(chan IncomingMessage),
|
||||
}
|
||||
|
||||
go (func() {
|
||||
for {
|
||||
connection, err := listener.Accept()
|
||||
if err != nil {
|
||||
cancel(err)
|
||||
return
|
||||
}
|
||||
go server.handleConnection(connection)
|
||||
}
|
||||
})()
|
||||
|
||||
return server, nil
|
||||
}
|
||||
|
||||
func (server *Server) Close() {
|
||||
close(server.incomingMessages)
|
||||
server.cancel(nil)
|
||||
}
|
||||
|
||||
func (server *Server) handleConnection(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
|
||||
clientCtx, cancel := context.WithCancelCause(server.ctx)
|
||||
outgoingMessages := make(chan OutgoingMessage)
|
||||
|
||||
clientId := server.connectedClients.Enroll(func(id ClientId) ConnectedClient {
|
||||
return ConnectedClient{
|
||||
cancel: cancel,
|
||||
outgoingMessages: outgoingMessages,
|
||||
}
|
||||
})
|
||||
defer server.connectedClients.Unenroll(clientId)
|
||||
|
||||
go (func() {
|
||||
<-clientCtx.Done()
|
||||
cause := context.Cause(clientCtx)
|
||||
log.Printf("client %d done: %s", clientId, cause)
|
||||
})()
|
||||
|
||||
ingoingLines := readLineByLine(conn)
|
||||
outgoingLines := bufio.NewWriter(conn)
|
||||
|
||||
for {
|
||||
select {
|
||||
case item := <-ingoingLines:
|
||||
line := item.Line
|
||||
err := item.Error
|
||||
|
||||
if err != nil {
|
||||
cancel(err)
|
||||
continue
|
||||
}
|
||||
|
||||
msg, err := Deserialize(line)
|
||||
if err != nil {
|
||||
cancel(err)
|
||||
continue
|
||||
}
|
||||
if msg == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Printf("recv: %v", msg)
|
||||
server.incomingMessages <- IncomingMessage{
|
||||
Sender: clientId,
|
||||
Content: *msg,
|
||||
}
|
||||
case outgoing := <-outgoingMessages:
|
||||
log.Printf("sent: %v", outgoing.Content)
|
||||
content, err := outgoing.Content.Serialize()
|
||||
if err != nil {
|
||||
cancel(err)
|
||||
continue
|
||||
}
|
||||
log.Printf("content: %s", *content)
|
||||
|
||||
_, err = outgoingLines.WriteString(*content)
|
||||
if err != nil {
|
||||
cancel(err)
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: Don't flush on every iteration
|
||||
err = outgoingLines.Flush()
|
||||
if err != nil {
|
||||
cancel(err)
|
||||
continue
|
||||
}
|
||||
case <-clientCtx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (server *Server) ReceiveMessage() (IncomingMessage, error) {
|
||||
message, ok := <-server.incomingMessages
|
||||
if !ok {
|
||||
return IncomingMessage{}, ErrAlreadyClosed
|
||||
}
|
||||
return message, nil
|
||||
}
|
||||
|
||||
func (server *Server) SendMessage(client ClientId, content Content) {
|
||||
outgoing := OutgoingMessage{
|
||||
Recipient: client,
|
||||
Content: content,
|
||||
}
|
||||
server.connectedClients.BorrowIfPresent(client, func(connectedClient *ConnectedClient) {
|
||||
connectedClient.outgoingMessages <- outgoing
|
||||
})
|
||||
}
|
||||
|
||||
func (server *Server) TerminateClient(client ClientId, err error) {
|
||||
server.connectedClients.BorrowIfPresent(client, func(connectedClient *ConnectedClient) {
|
||||
connectedClient.cancel(err)
|
||||
})
|
||||
}
|
Reference in New Issue
Block a user