Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
<Compile Include="System\Formats\Tar\TarWriter.cs" />
<Compile Include="System\Formats\Tar\TarWriterOptions.cs" />
<Compile Include="System\Formats\Tar\GnuSparseStream.cs" />
<Compile Include="System\Formats\Tar\TarReadWriteAdapter.cs" />
<Compile Include="$(CommonPath)DisableRuntimeMarshalling.cs" Link="Common\DisableRuntimeMarshalling.cs" />
<Compile Include="$(CommonPath)System\IO\SubReadStream.cs" Link="Common\System\IO\SubReadStream.cs" />
<Compile Include="$(CommonPath)System\IO\Archiving.Utils.cs" Link="Common\System\IO\Archiving.Utils.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ public Stream? DataStream
// This entry came from a reader, so if the underlying stream is unseekable, we need to
// manually advance the stream pointer to the next header before doing the substitution
// The original stream will get disposed when the reader gets disposed.
_readerOfOrigin.AdvanceDataStreamIfNeeded();
ValueTask vt = _readerOfOrigin.AdvanceDataStreamIfNeededCoreAsync<SyncReadWriteAdapter>(CancellationToken.None);
Debug.Assert(vt.IsCompleted, "Synchronous AdvanceDataStreamIfNeeded completed asynchronously.");
vt.GetAwaiter().GetResult();
// We only do this once
_readerOfOrigin = null;
}
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

100 changes: 44 additions & 56 deletions src/libraries/System.Formats.Tar/src/System/Formats/Tar/TarHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.IO;
Expand Down Expand Up @@ -51,25 +52,24 @@ internal static int GetDefaultMode(TarEntryType type)
// Helps advance the stream a total number of bytes larger than int.MaxValue.
internal static void AdvanceStream(Stream archiveStream, long bytesToDiscard)
{
if (archiveStream.CanSeek)
{
archiveStream.Position += bytesToDiscard;
}
else if (bytesToDiscard > 0)
ValueTask vt = AdvanceStreamCoreAsync<SyncReadWriteAdapter>(archiveStream, bytesToDiscard, CancellationToken.None);
Debug.Assert(vt.IsCompleted, "Synchronous AdvanceStream completed asynchronously.");
vt.GetAwaiter().GetResult();
}

// Asynchronously helps advance the stream a total number of bytes larger than int.MaxValue.
internal static ValueTask AdvanceStreamAsync(Stream archiveStream, long bytesToDiscard, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(minimumLength: (int)Math.Min(MaxBufferLength, bytesToDiscard));
while (bytesToDiscard > 0)
{
int currentLengthToRead = (int)Math.Min(MaxBufferLength, bytesToDiscard);
archiveStream.ReadExactly(buffer.AsSpan(0, currentLengthToRead));
bytesToDiscard -= currentLengthToRead;
}
ArrayPool<byte>.Shared.Return(buffer);
return ValueTask.FromCanceled(cancellationToken);
}

return AdvanceStreamCoreAsync<AsyncReadWriteAdapter>(archiveStream, bytesToDiscard, cancellationToken);
}

Comment on lines 53 to 70

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need those two as separate helpers?

@alinpahontu2912 alinpahontu2912 Jun 12, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't touched gnuspsarsestream, which is where these two are used. the biggest code chunk in there, parsesparsemap, uses the isasync bool. Should I refactor it too?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,

// Asynchronously helps advance the stream a total number of bytes larger than int.MaxValue.
internal static async ValueTask AdvanceStreamAsync(Stream archiveStream, long bytesToDiscard, CancellationToken cancellationToken)
internal static async ValueTask AdvanceStreamCoreAsync<TAdapter>(Stream archiveStream, long bytesToDiscard, CancellationToken cancellationToken)
where TAdapter : IReadWriteAdapter
{
cancellationToken.ThrowIfCancellationRequested();

Expand All @@ -80,45 +80,44 @@ internal static async ValueTask AdvanceStreamAsync(Stream archiveStream, long by
else if (bytesToDiscard > 0)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(minimumLength: (int)Math.Min(MaxBufferLength, bytesToDiscard));
while (bytesToDiscard > 0)
try
{
int currentLengthToRead = (int)Math.Min(MaxBufferLength, bytesToDiscard);
await archiveStream.ReadExactlyAsync(buffer, 0, currentLengthToRead, cancellationToken).ConfigureAwait(false);
bytesToDiscard -= currentLengthToRead;
while (bytesToDiscard > 0)
{
int currentLengthToRead = (int)Math.Min(MaxBufferLength, bytesToDiscard);
await TAdapter.ReadExactlyAsync(archiveStream, buffer.AsMemory(0, currentLengthToRead), cancellationToken).ConfigureAwait(false);
bytesToDiscard -= currentLengthToRead;
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
ArrayPool<byte>.Shared.Return(buffer);
}
}

// Helps copy a specific number of bytes from one stream into another.
internal static void CopyBytes(Stream origin, Stream destination, long bytesToCopy)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(minimumLength: (int)Math.Min(MaxBufferLength, bytesToCopy));
while (bytesToCopy > 0)
{
int currentLengthToRead = (int)Math.Min(MaxBufferLength, bytesToCopy);
origin.ReadExactly(buffer.AsSpan(0, currentLengthToRead));
destination.Write(buffer.AsSpan(0, currentLengthToRead));
bytesToCopy -= currentLengthToRead;
}
ArrayPool<byte>.Shared.Return(buffer);
}

// Asynchronously helps copy a specific number of bytes from one stream into another.
internal static async ValueTask CopyBytesAsync(Stream origin, Stream destination, long bytesToCopy, CancellationToken cancellationToken)
internal static async ValueTask CopyBytesCoreAsync<TAdapter>(Stream origin, Stream destination, long bytesToCopy, CancellationToken cancellationToken)
where TAdapter : IReadWriteAdapter
{
cancellationToken.ThrowIfCancellationRequested();

byte[] buffer = ArrayPool<byte>.Shared.Rent(minimumLength: (int)Math.Min(MaxBufferLength, bytesToCopy));
while (bytesToCopy > 0)
try
{
while (bytesToCopy > 0)
{
int currentLengthToRead = (int)Math.Min(MaxBufferLength, bytesToCopy);
Memory<byte> memory = buffer.AsMemory(0, currentLengthToRead);
await TAdapter.ReadExactlyAsync(origin, memory, cancellationToken).ConfigureAwait(false);
await TAdapter.WriteAsync(destination, memory, cancellationToken).ConfigureAwait(false);
bytesToCopy -= currentLengthToRead;
}
}
finally
{
int currentLengthToRead = (int)Math.Min(MaxBufferLength, bytesToCopy);
Memory<byte> memory = buffer.AsMemory(0, currentLengthToRead);
await origin.ReadExactlyAsync(buffer, 0, currentLengthToRead, cancellationToken).ConfigureAwait(false);
await destination.WriteAsync(memory, cancellationToken).ConfigureAwait(false);
bytesToCopy -= currentLengthToRead;
ArrayPool<byte>.Shared.Return(buffer);
}
ArrayPool<byte>.Shared.Return(buffer);
}

// Returns the number of bytes until the next multiple of the record size.
Expand Down Expand Up @@ -293,22 +292,11 @@ internal static ReadOnlySpan<byte> TrimNullTerminated(ReadOnlySpan<byte> buffer)
// After the file contents, there may be zero or more null characters,
// which exist to ensure the data is aligned to the record size. Skip them and
// set the stream position to the first byte of the next entry.
internal static int SkipBlockAlignmentPadding(Stream archiveStream, long size)
{
int bytesToSkip = CalculatePadding(size);
AdvanceStream(archiveStream, bytesToSkip);
return bytesToSkip;
}

// After the file contents, there may be zero or more null characters,
// which exist to ensure the data is aligned to the record size.
// Asynchronously skip them and set the stream position to the first byte of the next entry.
internal static async ValueTask<int> SkipBlockAlignmentPaddingAsync(Stream archiveStream, long size, CancellationToken cancellationToken)
internal static async ValueTask<int> SkipBlockAlignmentPaddingCoreAsync<TAdapter>(Stream archiveStream, long size, CancellationToken cancellationToken)
where TAdapter : IReadWriteAdapter
{
cancellationToken.ThrowIfCancellationRequested();

int bytesToSkip = CalculatePadding(size);
await AdvanceStreamAsync(archiveStream, bytesToSkip, cancellationToken).ConfigureAwait(false);
await AdvanceStreamCoreAsync<TAdapter>(archiveStream, bytesToSkip, cancellationToken).ConfigureAwait(false);
return bytesToSkip;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace System.Formats.Tar
{
// Static-abstract adapter that lets a single generic implementation serve both
// synchronous and asynchronous Tar code paths. The sync entry point passes
// SyncReadWriteAdapter and asserts the returned ValueTask is already completed;
// the async entry point passes AsyncReadWriteAdapter. Mirrors the IReadWriteAdapter
// pattern used by SslStream / SmtpClient (see src/libraries/Common/src/System/Net/ReadWriteAdapter.cs).
internal interface IReadWriteAdapter

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to duplicate the adapter? can't we reuse/enhance the existing adapter we use in SslStream?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is possible, probably need to update the csproj file

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a good idea to add a reference the the netwroking project here though ? I think the adapters are also slightly different, since tar doesn't need the waitasync, readatleast and ssl doesn't need advancetoend/readexactly from what I can see, so maybe it's not worth combining them ? Or maybe we should create something super generic in a common project and reuse that adapter everywhere?

{
static abstract ValueTask<int> ReadAsync(Stream stream, Memory<byte> buffer, CancellationToken cancellationToken);
Comment thread
alinpahontu2912 marked this conversation as resolved.
static abstract ValueTask ReadExactlyAsync(Stream stream, Memory<byte> buffer, CancellationToken cancellationToken);
static abstract ValueTask WriteAsync(Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken);
static abstract ValueTask CopyToAsync(Stream source, Stream destination, CancellationToken cancellationToken);
static abstract ValueTask AdvanceToEndAsync(SubReadStream stream, CancellationToken cancellationToken);
static abstract ValueTask DisposeAsync(Stream stream);
}

internal readonly struct AsyncReadWriteAdapter : IReadWriteAdapter
{
public static ValueTask<int> ReadAsync(Stream stream, Memory<byte> buffer, CancellationToken cancellationToken) =>
stream.ReadAsync(buffer, cancellationToken);

public static ValueTask ReadExactlyAsync(Stream stream, Memory<byte> buffer, CancellationToken cancellationToken) =>
stream.ReadExactlyAsync(buffer, cancellationToken);

public static ValueTask WriteAsync(Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken) =>
stream.WriteAsync(buffer, cancellationToken);

public static ValueTask CopyToAsync(Stream source, Stream destination, CancellationToken cancellationToken) =>
new ValueTask(source.CopyToAsync(destination, cancellationToken));

public static ValueTask AdvanceToEndAsync(SubReadStream stream, CancellationToken cancellationToken) =>
stream.AdvanceToEndAsync(cancellationToken);

public static ValueTask DisposeAsync(Stream stream) => stream.DisposeAsync();
Comment on lines +27 to +42

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add awaits to help runtime async? Or are they not necessary?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is work in progress to make forwards like this work well in runtime async.

Note that await changes observable behaviors: It wraps exceptions in the Task and it saves/restores async context.

}

internal readonly struct SyncReadWriteAdapter : IReadWriteAdapter
{
public static ValueTask<int> ReadAsync(Stream stream, Memory<byte> buffer, CancellationToken cancellationToken) =>
new ValueTask<int>(stream.Read(buffer.Span));

public static ValueTask ReadExactlyAsync(Stream stream, Memory<byte> buffer, CancellationToken cancellationToken)
{
stream.ReadExactly(buffer.Span);
return default;
}

public static ValueTask WriteAsync(Stream stream, ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
{
stream.Write(buffer.Span);
return default;
}

public static ValueTask CopyToAsync(Stream source, Stream destination, CancellationToken cancellationToken)
{
source.CopyTo(destination);
return default;
}

public static ValueTask AdvanceToEndAsync(SubReadStream stream, CancellationToken cancellationToken)
{
stream.AdvanceToEnd();
return default;
}

public static ValueTask DisposeAsync(Stream stream)
{
stream.Dispose();
return default;
}
}
}
Loading