mirror of
https://github.com/space-wizards/RobustToolbox.git
synced 2026-02-14 19:29:36 +01:00
* Improve map serialization error logging * Prevent remove children of erroring entities * better logging * Improve error tolerance * Even more exception tolerance * missing ! * Add WriteYaml and WriteObject to IReplayFileWriter * Add MapLoaderSystem.TrySaveAllEntities() * On second thought, WriteObject will just be abused * I forgot to commit * Add default implementation to avoid breaking changes * release notes * fix merge issues --------- Co-authored-by: metalgearsloth <31366439+metalgearsloth@users.noreply.github.com>
254 lines
8.2 KiB
C#
254 lines
8.2 KiB
C#
using System;
|
|
using System.Buffers;
|
|
using System.Collections.Generic;
|
|
using System.IO;
|
|
using System.IO.Compression;
|
|
using System.Threading;
|
|
using System.Threading.Channels;
|
|
using Robust.Shared.Serialization;
|
|
using Robust.Shared.Utility;
|
|
using System.Threading.Tasks;
|
|
using Robust.Shared.Log;
|
|
using YamlDotNet.Core;
|
|
using YamlDotNet.RepresentationModel;
|
|
|
|
namespace Robust.Shared.Replays;
|
|
|
|
// This partial class has various methods for async file writing (in case the path is on a networked drive or something like that)
|
|
internal abstract partial class SharedReplayRecordingManager
|
|
{
|
|
// To avoid stuttering the main thread, we do IO (and writing to the zip in general) in the thread pool.
|
|
|
|
// While recording a replay, the Task for the write queue is stored in the RecordingState.
|
|
// However when the replay recording gets finished, we immediately clear _recState before the write queue is finished.
|
|
// As such, we need to track the task here.
|
|
// In practice, this list will most likely never contain more than a single element,
|
|
// and even then not for much longer than a couple hundred ms at most.
|
|
private readonly List<Task> _finalizingWriteTasks = new();
|
|
|
|
private void WriteYaml(
|
|
RecordingState state,
|
|
ResPath path,
|
|
YamlDocument data,
|
|
CompressionLevel level = CompressionLevel.Optimal)
|
|
{
|
|
var memStream = new MemoryStream();
|
|
using var writer = new StreamWriter(memStream);
|
|
var yamlStream = new YamlStream { data };
|
|
yamlStream.Save(new YamlMappingFix(new Emitter(writer)), false);
|
|
writer.Flush();
|
|
WriteBytes(state, path, memStream.AsMemory(), level);
|
|
}
|
|
|
|
private void WriteSerializer<T>(
|
|
RecordingState state,
|
|
ResPath path,
|
|
T obj,
|
|
CompressionLevel level = CompressionLevel.Optimal)
|
|
{
|
|
var memStream = new MemoryStream();
|
|
_serializer.SerializeDirect(memStream, obj);
|
|
|
|
WriteBytes(state, path, memStream.AsMemory(), level);
|
|
}
|
|
|
|
private void WritePooledBytes(
|
|
RecordingState state,
|
|
ResPath path,
|
|
byte[] bytes,
|
|
int length,
|
|
CompressionLevel compression)
|
|
{
|
|
DebugTools.Assert(path.IsRelative, "Zip path should be relative");
|
|
|
|
WriteQueueTask(state, () =>
|
|
{
|
|
try
|
|
{
|
|
var entry = state.Zip.CreateEntry(path.ToString(), compression);
|
|
using var stream = entry.Open();
|
|
stream.Write(bytes, 0, length);
|
|
}
|
|
finally
|
|
{
|
|
ArrayPool<byte>.Shared.Return(bytes);
|
|
}
|
|
});
|
|
}
|
|
|
|
private void WriteTickBatch(
|
|
RecordingState state,
|
|
ResPath path,
|
|
byte[] bytes,
|
|
int length)
|
|
{
|
|
DebugTools.Assert(path.IsRelative, "Zip path should be relative");
|
|
|
|
WriteQueueTask(state, () =>
|
|
{
|
|
byte[]? buf = null;
|
|
try
|
|
{
|
|
// Compress stream to buffer.
|
|
// First 4 bytes of buffer are reserved for the length of the uncompressed stream.
|
|
var bound = ZStd.CompressBound(length);
|
|
buf = ArrayPool<byte>.Shared.Rent(4 + bound);
|
|
var compressedLength = state.CompressionContext.Compress2(
|
|
buf.AsSpan(4, bound),
|
|
bytes.AsSpan(0, length));
|
|
|
|
BitConverter.TryWriteBytes(buf, length);
|
|
|
|
Interlocked.Add(ref state.UncompressedSize, length);
|
|
Interlocked.Add(ref state.CompressedSize, compressedLength);
|
|
|
|
var entry = state.Zip.CreateEntry(path.ToString(), CompressionLevel.NoCompression);
|
|
using var stream = entry.Open();
|
|
stream.Write(buf, 0, compressedLength + 4);
|
|
}
|
|
finally
|
|
{
|
|
ArrayPool<byte>.Shared.Return(bytes);
|
|
if (buf != null)
|
|
ArrayPool<byte>.Shared.Return(buf);
|
|
}
|
|
});
|
|
}
|
|
|
|
private void WriteToml(RecordingState state, IEnumerable<string> enumerable, ResPath path)
|
|
{
|
|
var memStream = new MemoryStream();
|
|
NetConf.SaveToTomlStream(memStream, enumerable);
|
|
|
|
WriteBytes(state, path, memStream.AsMemory());
|
|
}
|
|
|
|
private void WriteBytes(
|
|
RecordingState recState,
|
|
ResPath path,
|
|
ReadOnlyMemory<byte> bytes,
|
|
CompressionLevel compression = CompressionLevel.Optimal)
|
|
{
|
|
DebugTools.Assert(path.IsRelative, "Zip path should be relative");
|
|
|
|
WriteQueueTask(recState, () =>
|
|
{
|
|
var entry = recState.Zip.CreateEntry(path.ToString(), compression);
|
|
using var stream = entry.Open();
|
|
stream.Write(bytes.Span);
|
|
});
|
|
}
|
|
|
|
private void WriteQueueTask(RecordingState recState, Action a)
|
|
{
|
|
var task = recState.WriteCommandChannel.WriteAsync(a);
|
|
|
|
// If we have to wait here, it's because the channel is full.
|
|
// Synchronous waiting is safe here: the writing code doesn't rely on the synchronization context.
|
|
if (!task.IsCompletedSuccessfully)
|
|
{
|
|
RecordingEventSource.Log.WriteQueueBlocked();
|
|
_sawmill.Warning("Forced to wait on replay write queue. Consider increasing replay.write_channel_size!");
|
|
task.AsTask().Wait();
|
|
}
|
|
}
|
|
|
|
protected void UpdateWriteTasks()
|
|
{
|
|
if (_recState != null)
|
|
{
|
|
// We are actively recording a replay. Check the status of the write task to make sure nothing went wrong.
|
|
if (_recState.WriteTask.IsFaulted)
|
|
{
|
|
_sawmill.Log(
|
|
LogLevel.Error,
|
|
_recState.WriteTask.Exception,
|
|
"Write task failed while recording due to exception, aborting recording!");
|
|
|
|
Reset();
|
|
}
|
|
else if (_recState.WriteTask.IsCompleted)
|
|
{
|
|
// This shouldn't be possible since the write task only exits if we close the channel,
|
|
// which we only do while clearing _recState.
|
|
_sawmill.Error("Write task completed, but did not report an error?");
|
|
}
|
|
}
|
|
|
|
for (var i = _finalizingWriteTasks.Count - 1; i >= 0; i--)
|
|
{
|
|
var task = _finalizingWriteTasks[i];
|
|
if (task.IsCompletedSuccessfully)
|
|
{
|
|
_sawmill.Debug("Write task finalized cleanly");
|
|
}
|
|
else if (task.IsFaulted)
|
|
{
|
|
_sawmill.Log(
|
|
LogLevel.Error,
|
|
task.Exception,
|
|
"Write task hit exception while finalizing, replay may have been corrupted!");
|
|
}
|
|
|
|
if (task.IsCompleted)
|
|
_finalizingWriteTasks.RemoveSwap(i);
|
|
}
|
|
}
|
|
|
|
public bool IsWriting()
|
|
{
|
|
UpdateWriteTasks();
|
|
return _finalizingWriteTasks.Count > 0;
|
|
}
|
|
|
|
public Task WaitWriteTasks()
|
|
{
|
|
if (IsRecording)
|
|
throw new InvalidOperationException("Cannot wait for writes to finish while still recording replay");
|
|
|
|
// First, check for any tasks that have encountered errors.
|
|
UpdateWriteTasks();
|
|
|
|
return Task.WhenAll(_finalizingWriteTasks);
|
|
}
|
|
|
|
#pragma warning disable RA0004
|
|
private static void WriteQueueLoop(
|
|
TaskCompletionSource taskCompletionSource,
|
|
ChannelReader<Action> reader,
|
|
ZipArchive archive,
|
|
ZStdCompressionContext compressionContext)
|
|
{
|
|
try
|
|
{
|
|
var i = 0;
|
|
while (true)
|
|
{
|
|
var result = reader.WaitToReadAsync().AsTask().Result;
|
|
|
|
if (!result)
|
|
break;
|
|
|
|
var action = reader.ReadAsync().AsTask().Result;
|
|
RecordingEventSource.Log.WriteTaskStart(i);
|
|
action();
|
|
RecordingEventSource.Log.WriteTaskStop(i);
|
|
|
|
i += 1;
|
|
}
|
|
|
|
taskCompletionSource.TrySetResult();
|
|
}
|
|
catch (Exception e)
|
|
{
|
|
taskCompletionSource.TrySetException(e);
|
|
}
|
|
finally
|
|
{
|
|
archive.Dispose();
|
|
compressionContext.Dispose();
|
|
}
|
|
}
|
|
#pragma warning restore RA0004
|
|
}
|