Separate PVS serialization from compression & sending (#5246)

This commit is contained in:
Leon Friedrich
2024-06-20 19:18:51 +12:00
committed by GitHub
parent 48ce24e98b
commit 14138fbcc2
8 changed files with 232 additions and 153 deletions

View File

@@ -1,11 +1,13 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using Robust.Shared.Collections;
using Robust.Shared.GameObjects;
using Robust.Shared.GameStates;
using Robust.Shared.Network;
using Robust.Shared.Network.Messages;
using Robust.Shared.Player;
using Robust.Shared.Timing;
using Robust.Shared.Utility;
@@ -115,6 +117,16 @@ internal sealed class PvsSession(ICommonSession session, ResizableMemoryRegion<P
/// </summary>
public GameState? State;
/// <summary>
/// The serialized <see cref="State"/> object.
/// </summary>
public MemoryStream? StateStream;
/// <summary>
/// Whether we should force reliable sending of the <see cref="MsgState"/>.
/// </summary>
public bool ForceSendReliably { get; set; }
/// <summary>
/// Clears all stored game state data. This should only be used after the game state has been serialized.
/// </summary>

View File

@@ -75,15 +75,15 @@ namespace Robust.Server.GameStates
return true;
}
private void CleanupDirty(ICommonSession[] sessions)
private void CleanupDirty()
{
using var _ = Histogram.WithLabels("Clean Dirty").NewTimer();
if (!CullingEnabled)
{
_seenAllEnts.Clear();
foreach (var player in sessions)
foreach (var player in _sessions)
{
_seenAllEnts.Add(player);
_seenAllEnts.Add(player.Session);
}
}

View File

@@ -17,13 +17,12 @@ internal sealed partial class PvsSystem
{
private WaitHandle? _leaveTask;
private void ProcessLeavePvs(ICommonSession[] sessions)
private void ProcessLeavePvs()
{
if (!CullingEnabled || sessions.Length == 0)
if (!CullingEnabled || _sessions.Length == 0)
return;
DebugTools.AssertNull(_leaveTask);
_leaveJob.Setup(sessions);
if (_async)
{
@@ -76,29 +75,19 @@ internal sealed partial class PvsSystem
{
public int BatchSize => 2;
private PvsSystem _pvs = _pvs;
public int Count => _sessions.Length;
private PvsSession[] _sessions;
public int Count => _pvs._sessions.Length;
public void Execute(int index)
{
try
{
_pvs.ProcessLeavePvs(_sessions[index]);
_pvs.ProcessLeavePvs(_pvs._sessions[index]);
}
catch (Exception e)
{
_pvs.Log.Log(LogLevel.Error, e, $"Caught exception while processing pvs-leave messages.");
}
}
public void Setup(ICommonSession[] sessions)
{
// Copy references to PvsSession, in case players disconnect while the job is running.
Array.Resize(ref _sessions, sessions.Length);
for (var i = 0; i < sessions.Length; i++)
{
_sessions[i] = _pvs.PlayerData[sessions[i]];
}
}
}
}

View File

@@ -0,0 +1,85 @@
using System;
using System.Threading.Tasks;
using Prometheus;
using Robust.Shared.Log;
using Robust.Shared.Network.Messages;
using Robust.Shared.Player;
using Robust.Shared.Utility;
namespace Robust.Server.GameStates;
internal sealed partial class PvsSystem
{
/// <summary>
/// Compress and send game states to connected clients.
/// </summary>
private void SendStates()
{
// TODO PVS make this async
// AFAICT ForEachAsync doesn't support using a threadlocal PvsThreadResources.
// Though if it is getting pooled, does it really matter?
// If this does get run async, then ProcessDisconnections() has to ensure that the job has finished before modifying
// the sessions array
using var _ = Histogram.WithLabels("Send States").NewTimer();
var opts = new ParallelOptions {MaxDegreeOfParallelism = _parallelMgr.ParallelProcessCount};
Parallel.ForEach(_sessions, opts, _threadResourcesPool.Get, SendSessionState, _threadResourcesPool.Return);
}
private PvsThreadResources SendSessionState(PvsSession data, ParallelLoopState state, PvsThreadResources resource)
{
try
{
SendSessionState(data, resource.CompressionContext);
}
catch (Exception e)
{
Log.Log(LogLevel.Error, e, $"Caught exception while sending mail for {data.Session}.");
}
return resource;
}
private void SendSessionState(PvsSession data, ZStdCompressionContext ctx)
{
DebugTools.AssertEqual(data.State, null);
// PVS benchmarks use dummy sessions.
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
if (data.Session.Channel is not DummyChannel)
{
DebugTools.AssertNotEqual(data.StateStream, null);
var msg = new MsgState
{
StateStream = data.StateStream,
ForceSendReliably = data.ForceSendReliably,
CompressionContext = ctx
};
_netMan.ServerSendMessage(msg, data.Session.Channel);
if (msg.ShouldSendReliably())
{
data.RequestedFull = false;
data.LastReceivedAck = _gameTiming.CurTick;
lock (PendingAcks)
{
PendingAcks.Add(data.Session);
}
}
}
else
{
// Always "ack" dummy sessions.
data.LastReceivedAck = _gameTiming.CurTick;
data.RequestedFull = false;
lock (PendingAcks)
{
PendingAcks.Add(data.Session);
}
}
data.StateStream?.Dispose();
data.StateStream = null;
}
}

View File

@@ -0,0 +1,73 @@
using System;
using System.Threading.Tasks;
using Prometheus;
using Robust.Shared.GameObjects;
using Robust.Shared.GameStates;
using Robust.Shared.IoC;
using Robust.Shared.Log;
using Robust.Shared.Player;
using Robust.Shared.Serialization;
using Robust.Shared.Timing;
using Robust.Shared.Utility;
namespace Robust.Server.GameStates;
internal sealed partial class PvsSystem
{
[Dependency] private readonly IRobustSerializer _serializer = default!;
/// <summary>
/// Get and serialize <see cref="GameState"/> objects for each player. Compressing & sending the states is done later.
/// </summary>
private void SerializeStates()
{
using var _ = Histogram.WithLabels("Serialize States").NewTimer();
var opts = new ParallelOptions {MaxDegreeOfParallelism = _parallelMgr.ParallelProcessCount};
_oldestAck = GameTick.MaxValue.Value;
Parallel.For(-1, _sessions.Length, opts, SerializeState);
}
/// <summary>
/// Get and serialize a <see cref="GameState"/> for a single session (or the current replay).
/// </summary>
private void SerializeState(int i)
{
try
{
var guid = i >= 0 ? _sessions[i].Session.UserId.UserId : default;
ServerGameStateManager.PvsEventSource.Log.WorkStart(_gameTiming.CurTick.Value, i, guid);
if (i >= 0)
SerializeSessionState(_sessions[i]);
else
_replay.Update();
ServerGameStateManager.PvsEventSource.Log.WorkStop(_gameTiming.CurTick.Value, i, guid);
}
catch (Exception e) // Catch EVERY exception
{
var source = i >= 0 ? _sessions[i].Session.ToString() : "replays";
Log.Log(LogLevel.Error, e, $"Caught exception while serializing game state for {source}.");
}
}
/// <summary>
/// Get and serialize a <see cref="GameState"/> for a single session.
/// </summary>
private void SerializeSessionState(PvsSession data)
{
ComputeSessionState(data);
InterlockedHelper.Min(ref _oldestAck, data.FromTick.Value);
DebugTools.AssertEqual(data.StateStream, null);
// PVS benchmarks use dummy sessions.
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
if (data.Session.Channel is not DummyChannel)
{
data.StateStream = RobustMemoryManager.GetMemoryStream();
_serializer.SerializeDirect(data.StateStream, data.State);
}
data.ClearState();
}
}

View File

@@ -7,8 +7,6 @@ using Robust.Shared.Enums;
using Robust.Shared.GameObjects;
using Robust.Shared.GameStates;
using Robust.Shared.Map;
using Robust.Shared.Network;
using Robust.Shared.Network.Messages;
using Robust.Shared.Player;
using Robust.Shared.Timing;
using Robust.Shared.Utility;
@@ -27,49 +25,6 @@ internal sealed partial class PvsSystem
private List<ICommonSession> _disconnected = new();
private void SendStateUpdate(ICommonSession session, PvsThreadResources resources)
{
var data = GetOrNewPvsSession(session);
ComputeSessionState(data);
InterlockedHelper.Min(ref _oldestAck, data.FromTick.Value);
// actually send the state
var msg = new MsgState
{
State = data.State,
CompressionContext = resources.CompressionContext
};
// PVS benchmarks use dummy sessions.
// ReSharper disable once ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
if (session.Channel is not DummyChannel)
{
_netMan.ServerSendMessage(msg, session.Channel);
if (msg.ShouldSendReliably())
{
data.RequestedFull = false;
data.LastReceivedAck = _gameTiming.CurTick;
lock (PendingAcks)
{
PendingAcks.Add(session);
}
}
}
else
{
// Always "ack" dummy sessions.
data.LastReceivedAck = _gameTiming.CurTick;
data.RequestedFull = false;
lock (PendingAcks)
{
PendingAcks.Add(session);
}
}
data.ClearState();
}
private PvsSession GetOrNewPvsSession(ICommonSession session)
{
if (!PlayerData.TryGetValue(session, out var pvsSession))
@@ -104,7 +59,7 @@ internal sealed partial class PvsSystem
session.PlayerStates,
_deletedEntities);
session.State.ForceSendReliably = session.RequestedFull
session.ForceSendReliably = session.RequestedFull
|| _gameTiming.CurTick > session.LastReceivedAck + (uint) ForceAckThreshold;
}

View File

@@ -3,10 +3,8 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Numerics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.ObjectPool;
using Prometheus;
using Robust.Server.Configuration;
@@ -16,9 +14,6 @@ using Robust.Server.Replays;
using Robust.Shared;
using Robust.Shared.Configuration;
using Robust.Shared.GameObjects;
using Robust.Shared.GameStates;
using Robust.Shared.IoC;
using Robust.Shared.Log;
using Robust.Shared.Map;
using Robust.Shared.Network;
using Robust.Shared.Player;
@@ -99,6 +94,10 @@ internal sealed partial class PvsSystem : EntitySystem
/// </summary>
private readonly List<GameTick> _deletedTick = new();
/// <summary>
/// The sessions that are currently being processed. Note that this is in general used by parallel & async tasks.
/// Hence player disconnection processing is deferred and only run via <see cref="ProcessDisconnections"/>.
/// </summary>
private PvsSession[] _sessions = default!;
private bool _async;
@@ -183,52 +182,25 @@ internal sealed partial class PvsSystem : EntitySystem
/// </summary>
internal void SendGameStates(ICommonSession[] players)
{
// Wait for pending jobs and process disconnected players
ProcessDisconnections();
// Ensure each session has a PvsSession entry before starting any parallel jobs.
CacheSessionData(players);
// Get visible chunks, and update any dirty chunks.
BeforeSendState();
BeforeSerializeStates();
// Construct & send the game state to each player.
SendStates(players);
// Construct & serialize the game state for each player (and for the replay).
SerializeStates();
// Compress & send the states.
SendStates();
// Cull deletion history
AfterSendState(players);
AfterSerializeStates();
ProcessLeavePvs(players);
}
private void SendStates(ICommonSession[] players)
{
using var _ = Histogram.WithLabels("Send States").NewTimer();
var opts = new ParallelOptions {MaxDegreeOfParallelism = _parallelMgr.ParallelProcessCount};
_oldestAck = GameTick.MaxValue.Value;
// Replays process game states in parallel with players
Parallel.For(-1, players.Length, opts, _threadResourcesPool.Get, SendPlayer, _threadResourcesPool.Return);
PvsThreadResources SendPlayer(int i, ParallelLoopState state, PvsThreadResources resource)
{
try
{
var guid = i >= 0 ? players[i].UserId.UserId : default;
ServerGameStateManager.PvsEventSource.Log.WorkStart(_gameTiming.CurTick.Value, i, guid);
if (i >= 0)
SendStateUpdate(players[i], resource);
else
_replay.Update();
ServerGameStateManager.PvsEventSource.Log.WorkStop(_gameTiming.CurTick.Value, i, guid);
}
catch (Exception e) // Catch EVERY exception
{
var source = i >= 0 ? players[i].ToString() : "replays";
Log.Log(LogLevel.Error, e, $"Caught exception while generating mail for {source}.");
}
return resource;
}
ProcessLeavePvs();
}
private void ResetParallelism(int _) => ResetParallelism();
@@ -414,23 +386,11 @@ internal sealed partial class PvsSystem : EntitySystem
}
}
private void BeforeSendState()
private void BeforeSerializeStates()
{
DebugTools.Assert(_chunks.Values.All(x => Exists(x.Map) && Exists(x.Root)));
DebugTools.Assert(_chunkSets.Keys.All(Exists));
_leaveTask?.WaitOne();
_leaveTask = null;
foreach (var session in _disconnected)
{
if (PlayerData.Remove(session, out var pvsSession))
{
ClearSendHistory(pvsSession);
FreeSessionDataMemory(pvsSession);
}
}
var ackJob = ProcessQueuedAcks();
// Figure out what chunks players can see and cache some chunk data.
@@ -443,6 +403,21 @@ internal sealed partial class PvsSystem : EntitySystem
ackJob?.WaitOne();
}
internal void ProcessDisconnections()
{
_leaveTask?.WaitOne();
_leaveTask = null;
foreach (var session in _disconnected)
{
if (PlayerData.Remove(session, out var pvsSession))
{
ClearSendHistory(pvsSession);
FreeSessionDataMemory(pvsSession);
}
}
}
internal void CacheSessionData(ICommonSession[] players)
{
Array.Resize(ref _sessions, players.Length);
@@ -452,9 +427,9 @@ internal sealed partial class PvsSystem : EntitySystem
}
}
private void AfterSendState(ICommonSession[] players)
private void AfterSerializeStates()
{
CleanupDirty(players);
CleanupDirty();
if (_oldestAck == GameTick.MaxValue.Value)
{

View File

@@ -17,15 +17,21 @@ namespace Robust.Shared.Network.Messages
// Ideally we would peg this to the actual configured MTU instead of the default constant, but oh well...
public const int ReliableThreshold = NetPeerConfiguration.kDefaultMTU - 20;
// If a state is larger than this, compress it with deflate.
// If a state is larger than this, we will compress it
// TODO PVS make this a cvar
// TODO PVS figure out optimal value
public const int CompressionThreshold = 256;
public override MsgGroups MsgGroup => MsgGroups.Entity;
public GameState State;
public MemoryStream StateStream;
public ZStdCompressionContext CompressionContext;
internal bool _hasWritten;
internal bool HasWritten;
internal bool ForceSendReliably;
public override void ReadFromBuffer(NetIncomingMessage buffer, IRobustSerializer serializer)
{
@@ -60,26 +66,19 @@ namespace Robust.Shared.Network.Messages
public override void WriteToBuffer(NetOutgoingMessage buffer, IRobustSerializer serializer)
{
using var stateStream = RobustMemoryManager.GetMemoryStream();
serializer.SerializeDirect(stateStream, State);
buffer.WriteVariableInt32((int)stateStream.Length);
buffer.WriteVariableInt32((int)StateStream.Length);
// We compress the state.
if (stateStream.Length > CompressionThreshold)
if (StateStream.Length > CompressionThreshold)
{
// var sw = Stopwatch.StartNew();
stateStream.Position = 0;
var buf = ArrayPool<byte>.Shared.Rent(ZStd.CompressBound((int)stateStream.Length));
var length = CompressionContext.Compress2(buf, stateStream.AsSpan());
StateStream.Position = 0;
var buf = ArrayPool<byte>.Shared.Rent(ZStd.CompressBound((int)StateStream.Length));
var length = CompressionContext.Compress2(buf, StateStream.AsSpan());
buffer.WriteVariableInt32(length);
buffer.Write(buf.AsSpan(0, length));
// var elapsed = sw.Elapsed;
// System.Console.WriteLine(
// $"From: {State.FromSequence} To: {State.ToSequence} Size: {length} B Before: {stateStream.Length} B time: {elapsed}");
ArrayPool<byte>.Shared.Return(buf);
}
// The state is sent as is.
@@ -87,10 +86,10 @@ namespace Robust.Shared.Network.Messages
{
// 0 means that the state isn't compressed.
buffer.WriteVariableInt32(0);
buffer.Write(stateStream.AsSpan());
buffer.Write(StateStream.AsSpan());
}
_hasWritten = true;
HasWritten = true;
MsgSize = buffer.LengthBytes;
}
@@ -101,21 +100,12 @@ namespace Robust.Shared.Network.Messages
/// <returns></returns>
public bool ShouldSendReliably()
{
DebugTools.Assert(_hasWritten, "Attempted to determine sending method before determining packet size.");
return State.ForceSendReliably || MsgSize > ReliableThreshold;
DebugTools.Assert(HasWritten, "Attempted to determine sending method before determining packet size.");
return ForceSendReliably || MsgSize > ReliableThreshold;
}
public override NetDeliveryMethod DeliveryMethod
{
get
{
if (ShouldSendReliably())
{
return NetDeliveryMethod.ReliableUnordered;
}
return base.DeliveryMethod;
}
}
public override NetDeliveryMethod DeliveryMethod => ShouldSendReliably()
? NetDeliveryMethod.ReliableUnordered
: base.DeliveryMethod;
}
}