package transport import ( "bufio" "context" "fmt" "log" "net" ) type Server struct { ctx context.Context cancel context.CancelCauseFunc connectedClients ConnectedClients incomingMessages chan IncomingMessage } var ErrAlreadyClosed = fmt.Errorf("server already closed") func NewServer(address string) (*Server, error) { ctx, cancel := context.WithCancelCause(context.Background()) listener, err := net.Listen("tcp", address) if err != nil { return nil, err } server := &Server{ ctx: ctx, cancel: cancel, connectedClients: newConnectedClients(), incomingMessages: make(chan IncomingMessage, 1024), } go (func() { for { connection, err := listener.Accept() if err != nil { cancel(err) return } go server.handleConnection(connection) } })() return server, nil } func (server *Server) Close() { close(server.incomingMessages) server.cancel(nil) } func (server *Server) handleConnection(conn net.Conn) { defer conn.Close() clientCtx, cancel := context.WithCancelCause(server.ctx) outgoingMessages := make(chan OutgoingMessage, 1024) clientId := server.connectedClients.Enroll(func(id ClientId) ConnectedClient { return ConnectedClient{ cancel: cancel, outgoingMessages: outgoingMessages, } }) defer server.connectedClients.Unenroll(clientId) go (func() { <-clientCtx.Done() cause := context.Cause(clientCtx) log.Printf("client %d done: %s", clientId, cause) })() ingoingLines := readLineByLine(conn) outgoingLines := bufio.NewWriter(conn) for { select { case item := <-ingoingLines: line := item.Line err := item.Error if err != nil { cancel(err) continue } msg, err := Deserialize(line) if err != nil { cancel(err) continue } if msg == nil { continue } log.Printf("recv: %v", msg) server.incomingMessages <- IncomingMessage{ Sender: clientId, Content: *msg, } case outgoing := <-outgoingMessages: log.Printf("sent: %v", outgoing.Content) content, err := outgoing.Content.Serialize() if err != nil { cancel(err) continue } log.Printf("content: %s", *content) _, err = outgoingLines.WriteString(*content) if err != nil { cancel(err) continue } // TODO: Don't flush on every iteration err = outgoingLines.Flush() if err != nil { cancel(err) continue } case <-clientCtx.Done(): return } } } func (server *Server) ReceiveMessage() (IncomingMessage, error) { message, ok := <-server.incomingMessages if !ok { return IncomingMessage{}, ErrAlreadyClosed } return message, nil } func (server *Server) SendMessage(client ClientId, content Content) { outgoing := OutgoingMessage{ Recipient: client, Content: content, } server.connectedClients.BorrowIfPresent(client, func(connectedClient *ConnectedClient) { log.Printf("putting in outgoing") connectedClient.outgoingMessages <- outgoing log.Printf("done putting in outgoing") }) } func (server *Server) TerminateClient(client ClientId, err error) { server.connectedClients.BorrowIfPresent(client, func(connectedClient *ConnectedClient) { connectedClient.cancel(err) }) }