LibGUI/WindowServer: Rework packet serialization
Instead of sending while serializing (what even was that), we serialize the whole packet into a buffer which can be sent in one go. First of all this reduces the number of sends by a lot. This also fixes WindowServer ending up sending partial packets when client is not responsive. Previously we would just try sending once, if any send failed the send was aborted while partial packet was already transmitted. This lead to packet stream being out of sync leading to the client killing itself. Now we allow 64 KiB outgoing buffer per client. If this buffer ever fills up, we will not send partial packets.
This commit is contained in:
@@ -145,6 +145,8 @@ int open_server_fd()
|
||||
return server_fd;
|
||||
}
|
||||
|
||||
int g_epoll_fd = -1;
|
||||
|
||||
int main()
|
||||
{
|
||||
srand(time(nullptr));
|
||||
@@ -157,8 +159,8 @@ int main()
|
||||
return 1;
|
||||
}
|
||||
|
||||
int epoll_fd = epoll_create1(EPOLL_CLOEXEC);
|
||||
if (epoll_fd == -1)
|
||||
g_epoll_fd = epoll_create1(EPOLL_CLOEXEC);
|
||||
if (g_epoll_fd == -1)
|
||||
{
|
||||
dwarnln("epoll_create1: {}", strerror(errno));
|
||||
return 1;
|
||||
@@ -169,7 +171,7 @@ int main()
|
||||
.events = EPOLLIN,
|
||||
.data = { .fd = server_fd },
|
||||
};
|
||||
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) == -1)
|
||||
if (epoll_ctl(g_epoll_fd, EPOLL_CTL_ADD, server_fd, &event) == -1)
|
||||
{
|
||||
dwarnln("epoll_ctl server: {}", strerror(errno));
|
||||
return 1;
|
||||
@@ -214,7 +216,7 @@ int main()
|
||||
.events = EPOLLIN,
|
||||
.data = { .fd = keyboard_fd },
|
||||
};
|
||||
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, keyboard_fd, &event) == -1)
|
||||
if (epoll_ctl(g_epoll_fd, EPOLL_CTL_ADD, keyboard_fd, &event) == -1)
|
||||
{
|
||||
dwarnln("epoll_ctl keyboard: {}", strerror(errno));
|
||||
close(keyboard_fd);
|
||||
@@ -231,7 +233,7 @@ int main()
|
||||
.events = EPOLLIN,
|
||||
.data = { .fd = mouse_fd },
|
||||
};
|
||||
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, mouse_fd, &event) == -1)
|
||||
if (epoll_ctl(g_epoll_fd, EPOLL_CTL_ADD, mouse_fd, &event) == -1)
|
||||
{
|
||||
dwarnln("epoll_ctl mouse: {}", strerror(errno));
|
||||
close(mouse_fd);
|
||||
@@ -283,7 +285,7 @@ int main()
|
||||
timeout.tv_nsec = (sync_interval_us - (current_us - last_sync_us)) * 1000;
|
||||
|
||||
epoll_event events[16];
|
||||
int epoll_events = epoll_pwait2(epoll_fd, events, 16, &timeout, nullptr);
|
||||
int epoll_events = epoll_pwait2(g_epoll_fd, events, 16, &timeout, nullptr);
|
||||
if (epoll_events == -1 && errno != EINTR)
|
||||
{
|
||||
dwarnln("epoll_pwait2: {}", strerror(errno));
|
||||
@@ -296,25 +298,28 @@ int main()
|
||||
{
|
||||
ASSERT(events[i].events & EPOLLIN);
|
||||
|
||||
int window_fd = accept4(server_fd, nullptr, nullptr, SOCK_NONBLOCK | SOCK_CLOEXEC);
|
||||
if (window_fd == -1)
|
||||
int client_fd = accept4(server_fd, nullptr, nullptr, SOCK_NONBLOCK | SOCK_CLOEXEC);
|
||||
if (client_fd == -1)
|
||||
{
|
||||
dwarnln("accept: {}", strerror(errno));
|
||||
continue;
|
||||
}
|
||||
|
||||
epoll_event event {
|
||||
.events = EPOLLIN,
|
||||
.data = { .fd = window_fd },
|
||||
};
|
||||
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, window_fd, &event) == -1)
|
||||
epoll_event event { .events = EPOLLIN, .data = { .fd = client_fd } };
|
||||
if (epoll_ctl(g_epoll_fd, EPOLL_CTL_ADD, client_fd, &event) == -1)
|
||||
{
|
||||
dwarnln("epoll_ctl: {}", strerror(errno));
|
||||
close(window_fd);
|
||||
close(client_fd);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (auto ret = window_server.add_client_fd(client_fd); ret.is_error())
|
||||
{
|
||||
dwarnln("add_client: {}", ret.error());
|
||||
close(client_fd);
|
||||
continue;
|
||||
}
|
||||
|
||||
window_server.add_client_fd(window_fd);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -361,99 +366,127 @@ int main()
|
||||
}
|
||||
|
||||
const int client_fd = events[i].data.fd;
|
||||
if (events[i].events & EPOLLHUP)
|
||||
if (events[i].events & (EPOLLHUP | EPOLLERR))
|
||||
{
|
||||
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);
|
||||
epoll_ctl(g_epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);
|
||||
window_server.remove_client_fd(client_fd);
|
||||
continue;
|
||||
}
|
||||
|
||||
ASSERT(events[i].events & EPOLLIN);
|
||||
|
||||
auto& client_data = window_server.get_client_data(client_fd);
|
||||
|
||||
if (client_data.packet_buffer.empty())
|
||||
if (events[i].events & EPOLLOUT)
|
||||
{
|
||||
uint32_t packet_size;
|
||||
const ssize_t nrecv = recv(client_fd, &packet_size, sizeof(uint32_t), 0);
|
||||
if (nrecv < 0)
|
||||
dwarnln("recv 1: {}", strerror(errno));
|
||||
if (nrecv > 0 && nrecv != sizeof(uint32_t))
|
||||
dwarnln("could not read packet size with a single recv call, closing connection...");
|
||||
if (nrecv != sizeof(uint32_t))
|
||||
ASSERT(client_data.out_buffer_size > 0);
|
||||
|
||||
const ssize_t nsend = send(client_fd, client_data.out_buffer.data(), client_data.out_buffer_size, 0);
|
||||
if (nsend < 0 && !(errno == EWOULDBLOCK || errno == EAGAIN))
|
||||
{
|
||||
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);
|
||||
dwarnln("send: {}", strerror(errno));
|
||||
epoll_ctl(g_epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);
|
||||
window_server.remove_client_fd(client_fd);
|
||||
break;
|
||||
}
|
||||
|
||||
if (packet_size < 4)
|
||||
if (nsend > 0)
|
||||
{
|
||||
dwarnln("client sent invalid packet, closing connection...");
|
||||
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);
|
||||
window_server.remove_client_fd(client_fd);
|
||||
break;
|
||||
client_data.out_buffer_size -= nsend;
|
||||
if (client_data.out_buffer_size == 0)
|
||||
{
|
||||
epoll_event event { .events = EPOLLIN, .data = { .fd = client_fd } };
|
||||
if (epoll_ctl(g_epoll_fd, EPOLL_CTL_MOD, client_fd, &event) == -1)
|
||||
dwarnln("epoll_ctl remove EPOLLOUT: {}", strerror(errno));
|
||||
}
|
||||
else
|
||||
{
|
||||
// TODO: maybe use a ring buffer so we don't have to memmove everything not sent
|
||||
memmove(
|
||||
client_data.out_buffer.data(),
|
||||
client_data.out_buffer.data() + nsend,
|
||||
client_data.out_buffer_size
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// this is a bit harsh, but i don't want to work on skipping streaming packets
|
||||
if (client_data.packet_buffer.resize(packet_size).is_error())
|
||||
{
|
||||
dwarnln("could not allocate memory for client packet, closing connection...");
|
||||
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);
|
||||
window_server.remove_client_fd(client_fd);
|
||||
break;
|
||||
}
|
||||
|
||||
client_data.packet_buffer_nread = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
const ssize_t nrecv = recv(
|
||||
client_fd,
|
||||
client_data.packet_buffer.data() + client_data.packet_buffer_nread,
|
||||
client_data.packet_buffer.size() - client_data.packet_buffer_nread,
|
||||
0
|
||||
);
|
||||
if (nrecv < 0)
|
||||
dwarnln("recv 2: {}", strerror(errno));
|
||||
if (nrecv <= 0)
|
||||
{
|
||||
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);
|
||||
window_server.remove_client_fd(client_fd);
|
||||
break;
|
||||
}
|
||||
|
||||
client_data.packet_buffer_nread += nrecv;
|
||||
if (client_data.packet_buffer_nread < client_data.packet_buffer.size())
|
||||
if (!(events[i].events & EPOLLIN))
|
||||
continue;
|
||||
|
||||
ASSERT(client_data.packet_buffer.size() >= sizeof(uint32_t));
|
||||
|
||||
switch (*reinterpret_cast<LibGUI::PacketType*>(client_data.packet_buffer.data()))
|
||||
{
|
||||
const ssize_t nrecv = recv(
|
||||
client_fd,
|
||||
client_data.in_buffer.data() + client_data.in_buffer_size,
|
||||
client_data.in_buffer.size() - client_data.in_buffer_size,
|
||||
0
|
||||
);
|
||||
if (nrecv < 0 && !(errno == EWOULDBLOCK || errno == EAGAIN))
|
||||
{
|
||||
dwarnln("recv: {}", strerror(errno));
|
||||
epoll_ctl(g_epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);
|
||||
window_server.remove_client_fd(client_fd);
|
||||
break;
|
||||
}
|
||||
if (nrecv > 0)
|
||||
client_data.in_buffer_size += nrecv;
|
||||
}
|
||||
|
||||
size_t bytes_handled = 0;
|
||||
while (client_data.in_buffer_size - bytes_handled >= sizeof(LibGUI::PacketHeader))
|
||||
{
|
||||
BAN::ConstByteSpan packet_span = client_data.in_buffer.span().slice(bytes_handled, client_data.in_buffer_size - bytes_handled);
|
||||
const auto header = packet_span.as<const LibGUI::PacketHeader>();
|
||||
if (packet_span.size() < header.size || header.size < sizeof(LibGUI::PacketHeader))
|
||||
break;
|
||||
packet_span = packet_span.slice(0, header.size);
|
||||
|
||||
switch (header.type)
|
||||
{
|
||||
#define WINDOW_PACKET_CASE(enum, function) \
|
||||
case LibGUI::PacketType::enum: \
|
||||
if (auto ret = LibGUI::WindowPacket::enum::deserialize(client_data.packet_buffer.span()); !ret.is_error()) \
|
||||
window_server.function(client_fd, ret.release_value()); \
|
||||
break
|
||||
WINDOW_PACKET_CASE(WindowCreate, on_window_create);
|
||||
WINDOW_PACKET_CASE(WindowInvalidate, on_window_invalidate);
|
||||
WINDOW_PACKET_CASE(WindowSetPosition, on_window_set_position);
|
||||
WINDOW_PACKET_CASE(WindowSetAttributes, on_window_set_attributes);
|
||||
WINDOW_PACKET_CASE(WindowSetMouseRelative, on_window_set_mouse_relative);
|
||||
WINDOW_PACKET_CASE(WindowSetSize, on_window_set_size);
|
||||
WINDOW_PACKET_CASE(WindowSetMinSize, on_window_set_min_size);
|
||||
WINDOW_PACKET_CASE(WindowSetMaxSize, on_window_set_max_size);
|
||||
WINDOW_PACKET_CASE(WindowSetFullscreen, on_window_set_fullscreen);
|
||||
WINDOW_PACKET_CASE(WindowSetTitle, on_window_set_title);
|
||||
WINDOW_PACKET_CASE(WindowSetCursor, on_window_set_cursor);
|
||||
case LibGUI::PacketType::enum: \
|
||||
if (auto ret = LibGUI::WindowPacket::enum::deserialize(packet_span); !ret.is_error()) \
|
||||
window_server.function(client_fd, ret.release_value()); \
|
||||
else \
|
||||
derrorln("invalid packet: {}", ret.error()); \
|
||||
break
|
||||
WINDOW_PACKET_CASE(WindowCreate, on_window_create);
|
||||
WINDOW_PACKET_CASE(WindowInvalidate, on_window_invalidate);
|
||||
WINDOW_PACKET_CASE(WindowSetPosition, on_window_set_position);
|
||||
WINDOW_PACKET_CASE(WindowSetAttributes, on_window_set_attributes);
|
||||
WINDOW_PACKET_CASE(WindowSetMouseRelative, on_window_set_mouse_relative);
|
||||
WINDOW_PACKET_CASE(WindowSetSize, on_window_set_size);
|
||||
WINDOW_PACKET_CASE(WindowSetMinSize, on_window_set_min_size);
|
||||
WINDOW_PACKET_CASE(WindowSetMaxSize, on_window_set_max_size);
|
||||
WINDOW_PACKET_CASE(WindowSetFullscreen, on_window_set_fullscreen);
|
||||
WINDOW_PACKET_CASE(WindowSetTitle, on_window_set_title);
|
||||
WINDOW_PACKET_CASE(WindowSetCursor, on_window_set_cursor);
|
||||
#undef WINDOW_PACKET_CASE
|
||||
default:
|
||||
dprintln("unhandled packet type: {}", *reinterpret_cast<uint32_t*>(client_data.packet_buffer.data()));
|
||||
default:
|
||||
dprintln("unhandled packet type: {}", static_cast<uint32_t>(header.type));
|
||||
break;
|
||||
}
|
||||
|
||||
bytes_handled += header.size;
|
||||
}
|
||||
|
||||
client_data.packet_buffer.clear();
|
||||
client_data.packet_buffer_nread = 0;
|
||||
// NOTE: this will only move a single partial packet, so this is fine
|
||||
client_data.in_buffer_size -= bytes_handled;
|
||||
memmove(
|
||||
client_data.in_buffer.data(),
|
||||
client_data.in_buffer.data() + bytes_handled,
|
||||
client_data.in_buffer_size
|
||||
);
|
||||
|
||||
if (client_data.in_buffer_size >= sizeof(LibGUI::PacketHeader))
|
||||
{
|
||||
const auto header = BAN::ConstByteSpan(client_data.in_buffer.span()).as<const LibGUI::PacketHeader>();
|
||||
if (header.size < sizeof(LibGUI::PacketHeader) || header.size > client_data.in_buffer.size())
|
||||
{
|
||||
dwarnln("client tried to send a {} byte packet", header.size);
|
||||
epoll_ctl(g_epoll_fd, EPOLL_CTL_DEL, client_fd, nullptr);
|
||||
window_server.remove_client_fd(client_fd);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user