Attempts to fix replay recording performance issues.

Replays now use a dedicated thread (rather than thread pool) for write operations.

Moved batch operations to this thread as well. They were previously happening during PVS. Looking at some trace files these compression ops can easily take 5+ ms in some cases, so moving them somewhere else is appreciated.

Added EventSource instrumentation for PVS and replay recording.
This commit is contained in:
Pieter-Jan Briers
2023-08-27 01:59:30 +02:00
parent 01546f32da
commit 5e1d80be35
5 changed files with 173 additions and 32 deletions

View File

@@ -47,11 +47,11 @@ END TEMPLATE-->
### Other
*None yet*
* Performance improvements for replay recording.
### Internal
*None yet*
* Added some `EventSource` providers for PVS and replay recording: `Robust.Pvs` and `Robust.ReplayRecording`.
## 152.0.0

View File

@@ -1,6 +1,7 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics.Tracing;
using System.Linq;
using System.Threading.Tasks;
using JetBrains.Annotations;
@@ -24,7 +25,6 @@ using Microsoft.Extensions.ObjectPool;
using Prometheus;
using Robust.Server.Replays;
using Robust.Shared.Players;
using Robust.Shared.Map.Components;
namespace Robust.Server.GameStates
{
@@ -219,10 +219,16 @@ namespace Robust.Server.GameStates
{
try
{
var guid = i >= 0 ? players[i].UserId.UserId : default;
PvsEventSource.Log.WorkStart(_gameTiming.CurTick.Value, i, guid);
if (i >= 0)
SendStateUpdate(i, resource, inputSystem, players[i], pvsData, mQuery, tQuery, ref oldestAckValue);
else
_replay.Update();
PvsEventSource.Log.WorkStop(_gameTiming.CurTick.Value, i, guid);
}
catch (Exception e) // Catch EVERY exception
{
@@ -373,5 +379,35 @@ namespace Robust.Server.GameStates
_networkManager.ServerSendMessage(pvsMessage, channel);
}
}
[EventSource(Name = "Robust.Pvs")]
public sealed class PvsEventSource : System.Diagnostics.Tracing.EventSource
{
public static PvsEventSource Log { get; } = new();
[Event(1)]
public void WorkStart(uint tick, int playerIdx, Guid playerGuid) => WriteEvent(1, tick, playerIdx, playerGuid);
[Event(2)]
public void WorkStop(uint tick, int playerIdx, Guid playerGuid) => WriteEvent(2, tick, playerIdx, playerGuid);
[NonEvent]
private unsafe void WriteEvent(int eventId, uint arg1, int arg2, Guid arg3)
{
if (IsEnabled())
{
var descrs = stackalloc EventData[3];
descrs[0].DataPointer = (IntPtr)(&arg1);
descrs[0].Size = 4;
descrs[1].DataPointer = (IntPtr)(&arg2);
descrs[1].Size = 4;
descrs[2].DataPointer = (IntPtr)(&arg3);
descrs[2].Size = 16;
WriteEventCore(eventId, 3, descrs);
}
}
}
}
}

View File

@@ -0,0 +1,28 @@
using System.Diagnostics.Tracing;
namespace Robust.Shared.Replays;
internal abstract partial class SharedReplayRecordingManager
{
[EventSource(Name = "Robust.ReplayRecording")]
public sealed class RecordingEventSource : EventSource
{
public static RecordingEventSource Log { get; } = new();
[Event(1)]
public void WriteTaskStart(int task) => WriteEvent(1, task);
[Event(2)]
public void WriteTaskStop(int task) => WriteEvent(2, task);
[Event(3)]
public void WriteBatchStart(int index) => WriteEvent(3, index);
[Event(4)]
public void WriteBatchStop(int index, int uncompressed, int compressed) =>
WriteEvent(4, index, uncompressed, compressed);
[Event(5)]
public void WriteQueueBlocked() => WriteEvent(5);
}
}

View File

@@ -3,6 +3,7 @@ using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Threading;
using System.Threading.Channels;
using Robust.Shared.Serialization;
using Robust.Shared.Utility;
@@ -25,7 +26,7 @@ internal abstract partial class SharedReplayRecordingManager
// and even then not for much longer than a couple hundred ms at most.
private readonly List<Task> _finalizingWriteTasks = new();
private static void WriteYaml(RecordingState state, ResPath path, YamlDocument data)
private void WriteYaml(RecordingState state, ResPath path, YamlDocument data)
{
var memStream = new MemoryStream();
using var writer = new StreamWriter(memStream);
@@ -43,7 +44,7 @@ internal abstract partial class SharedReplayRecordingManager
WriteBytes(state, path, memStream.AsMemory());
}
private static void WritePooledBytes(
private void WritePooledBytes(
RecordingState state,
ResPath path,
byte[] bytes,
@@ -67,6 +68,45 @@ internal abstract partial class SharedReplayRecordingManager
});
}
private void WriteTickBatch(
RecordingState state,
ResPath path,
byte[] bytes,
int length)
{
DebugTools.Assert(path.IsRelative, "Zip path should be relative");
WriteQueueTask(state, () =>
{
byte[]? buf = null;
try
{
// Compress stream to buffer.
// First 4 bytes of buffer are reserved for the length of the uncompressed stream.
var bound = ZStd.CompressBound(length);
buf = ArrayPool<byte>.Shared.Rent(4 + bound);
var compressedLength = state.CompressionContext.Compress2(
buf.AsSpan(4, bound),
bytes.AsSpan(0, length));
BitConverter.TryWriteBytes(buf, length);
Interlocked.Add(ref state.UncompressedSize, length);
Interlocked.Add(ref state.CompressedSize, compressedLength);
var entry = state.Zip.CreateEntry(path.ToString(), CompressionLevel.NoCompression);
using var stream = entry.Open();
stream.Write(buf, 0, compressedLength + 4);
}
finally
{
ArrayPool<byte>.Shared.Return(bytes);
if (buf != null)
ArrayPool<byte>.Shared.Return(buf);
}
});
}
private void WriteToml(RecordingState state, IEnumerable<string> enumerable, ResPath path)
{
var memStream = new MemoryStream();
@@ -75,7 +115,7 @@ internal abstract partial class SharedReplayRecordingManager
WriteBytes(state, path, memStream.AsMemory());
}
private static void WriteBytes(
private void WriteBytes(
RecordingState recState,
ResPath path,
ReadOnlyMemory<byte> bytes,
@@ -91,14 +131,18 @@ internal abstract partial class SharedReplayRecordingManager
});
}
private static void WriteQueueTask(RecordingState recState, Action a)
private void WriteQueueTask(RecordingState recState, Action a)
{
var task = recState.WriteCommandChannel.WriteAsync(a);
// If we have to wait here, it's because the channel is full.
// Synchronous waiting is safe here: the writing code doesn't rely on the synchronization context.
if (!task.IsCompletedSuccessfully)
{
RecordingEventSource.Log.WriteQueueBlocked();
_sawmill.Warning("Forced to wait on replay write queue. Consider increasing replay.write_channel_size!");
task.AsTask().Wait();
}
}
protected void UpdateWriteTasks()
@@ -154,23 +198,42 @@ internal abstract partial class SharedReplayRecordingManager
return Task.WhenAll(_finalizingWriteTasks);
}
private static async Task WriteQueueLoop(ChannelReader<Action> reader, ZipArchive archive)
#pragma warning disable RA0004
private static void WriteQueueLoop(
TaskCompletionSource taskCompletionSource,
ChannelReader<Action> reader,
ZipArchive archive,
ZStdCompressionContext compressionContext)
{
try
{
var i = 0;
while (true)
{
var result = await reader.WaitToReadAsync();
var result = reader.WaitToReadAsync().AsTask().Result;
if (!result)
break;
var action = await reader.ReadAsync();
var action = reader.ReadAsync().AsTask().Result;
RecordingEventSource.Log.WriteTaskStart(i);
action();
RecordingEventSource.Log.WriteTaskStop(i);
i += 1;
}
taskCompletionSource.TrySetResult();
}
catch (Exception e)
{
taskCompletionSource.TrySetException(e);
}
finally
{
archive.Dispose();
compressionContext.Dispose();
}
}
#pragma warning restore RA0004
}

View File

@@ -19,6 +19,7 @@ using System.IO.Compression;
using System.Linq;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Robust.Shared.Asynchronous;
@@ -107,7 +108,7 @@ internal abstract partial class SharedReplayRecordingManager : IReplayRecordingM
try
{
WriteGameState(continueRecording: false);
WriteBatch(continueRecording: false);
_sawmill.Info("Replay recording stopped!");
}
catch
@@ -137,7 +138,7 @@ internal abstract partial class SharedReplayRecordingManager : IReplayRecordingM
_sawmill.Info("Reached requested replay recording length. Stopping recording.");
if (!continueRecording || _recState.Buffer.Length > _tickBatchSize)
WriteGameState(continueRecording);
WriteBatch(continueRecording);
}
catch (Exception e)
{
@@ -201,12 +202,19 @@ internal abstract partial class SharedReplayRecordingManager : IReplayRecordingM
new BoundedChannelOptions(NetConf.GetCVar(CVars.ReplayWriteChannelSize))
{
SingleReader = true,
SingleWriter = true,
AllowSynchronousContinuations = false
SingleWriter = true
}
);
var writeTask = Task.Run(() => WriteQueueLoop(commandQueue.Reader, zip));
var writeTaskTcs = new TaskCompletionSource();
// This is on its own thread instead of the thread pool.
// Official SS14 servers write replays to an NFS mount,
// which causes some write calls to have significant latency (~1s).
// We want to avoid clogging thread pool threads with that, so...
var writeThread = new Thread(() => WriteQueueLoop(writeTaskTcs, commandQueue.Reader, zip, context));
writeThread.Priority = ThreadPriority.BelowNormal;
writeThread.Name = "Replay Recording Thread";
writeThread.Start();
_recState = new RecordingState(
zip,
@@ -216,7 +224,7 @@ internal abstract partial class SharedReplayRecordingManager : IReplayRecordingM
Timing.CurTime,
recordingEnd,
commandQueue.Writer,
writeTask,
writeTaskTcs.Task,
directory,
filePath,
state
@@ -252,26 +260,33 @@ internal abstract partial class SharedReplayRecordingManager : IReplayRecordingM
_queuedMessages.Add(obj);
}
private void WriteGameState(bool continueRecording = true)
private void WriteBatch(bool continueRecording = true)
{
DebugTools.Assert(_recState != null);
var batchIndex = _recState.Index++;
RecordingEventSource.Log.WriteBatchStart(batchIndex);
_recState.Buffer.Position = 0;
// Compress stream to buffer.
// First 4 bytes of buffer are reserved for the length of the uncompressed stream.
var bound = ZStd.CompressBound((int)_recState.Buffer.Length);
var buf = ArrayPool<byte>.Shared.Rent(4 + bound);
var length = _recState.CompressionContext.Compress2(buf.AsSpan(4, bound), _recState.Buffer.AsSpan());
BitConverter.TryWriteBytes(buf, (int)_recState.Buffer.Length);
WritePooledBytes(
_recState,
ReplayZipFolder / $"{DataFilePrefix}{_recState.Index++}.{Ext}",
buf, 4 + length, CompressionLevel.NoCompression);
var uncompressed = _recState.Buffer.AsSpan();
var poolData = ArrayPool<byte>.Shared.Rent(uncompressed.Length);
uncompressed.CopyTo(poolData);
_recState.UncompressedSize += (int)_recState.Buffer.Length;
_recState.CompressedSize += length;
if (_recState.UncompressedSize >= _maxUncompressedSize || _recState.CompressedSize >= _maxCompressedSize)
WriteTickBatch(
_recState,
ReplayZipFolder / $"{DataFilePrefix}{batchIndex}.{Ext}",
poolData,
uncompressed.Length);
// Note: these values are ASYNCHRONOUSLY updated from the replay write thread.
// This means reading them here won't get the most up-to-date values,
// and we'll probably always be off-by-one.
// That's considered acceptable.
var uncompressedSize = Interlocked.Read(ref _recState.UncompressedSize);
var compressedSize = Interlocked.Read(ref _recState.CompressedSize);
if (uncompressedSize >= _maxUncompressedSize || compressedSize >= _maxCompressedSize)
{
_sawmill.Info("Reached max replay recording size. Stopping recording.");
continueRecording = false;
@@ -288,8 +303,7 @@ internal abstract partial class SharedReplayRecordingManager : IReplayRecordingM
if (_recState == null)
return;
_recState.CompressionContext.Dispose();
// File stream is always disposed from the worker task.
// File stream & compression context is always disposed from the worker task.
_recState.WriteCommandChannel.Complete();
_recState = null;