Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,5 @@ $tf/pendingchanges.tfb
$tf/properties.tf1
project.lock.json
**/.vs/

.claude/
12 changes: 11 additions & 1 deletion Sources/.editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,14 @@ csharp_new_line_before_catch = true
csharp_new_line_before_finally = true
csharp_indent_case_contents = true
csharp_indent_switch_labels = true
csharp_preserve_single_line_statements = false
csharp_preserve_single_line_statements = false
[*.cs]

# Default severity for analyzer diagnostics with category 'StyleCop.CSharp.NamingRules'
dotnet_analyzer_diagnostic.category-StyleCop.CSharp.NamingRules.severity = none

# Default severity for all analyzer diagnostics
dotnet_analyzer_diagnostic.severity = none

# SA1314: Type parameter names should begin with T
dotnet_diagnostic.SA1314.severity = none
10 changes: 0 additions & 10 deletions Sources/Core/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,5 @@
<Import Project="$([MSBuild]::GetPathOfFileAbove('Directory.Build.props', '$(MSBuildThisFileDirectory)../'))" />

<!-- Code Analysis -->
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0' And '$(Configuration)' == 'Debug'">
<PackageReference Include="StyleCop.Analyzers" Version="1.0.2">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
</PackageReference>
</ItemGroup>
<PropertyGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<!-- Do not set RunCodeAnalysis*, as this is incompatible with netstandard2.0. Analyzers run by default whenever included. -->
<CodeAnalysisRuleSet>$(EnlistmentRoot)\Sources\Microsoft.StreamProcessing.ruleset</CodeAnalysisRuleSet>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0</TargetFrameworks>
<TargetFrameworks>net10.0</TargetFrameworks>
<Platforms>x64;AnyCPU</Platforms>
</PropertyGroup>

Expand All @@ -11,19 +11,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.CSharp" Version="4.5.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.CodeAnalysis.Scripting" Version="3.1.0" />
<PackageReference Include="System.Diagnostics.Contracts" Version="4.3.0" />
<PackageReference Include="System.Diagnostics.Process" Version="4.3.0" />
<PackageReference Include="System.Runtime.Serialization.Primitives" Version="4.3.0" />
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)'=='netstandard2.0'">
<PackageReference Include="System.Runtime.Loader" Version="4.3.0" />
<PackageReference Include="Microsoft.CodeAnalysis.Scripting" Version="5.3.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ public TValue GetOrAdd(CacheKey key, Func<CacheKey, TValue> valueFactory)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public IEnumerator<KeyValuePair<CacheKey, TValue>> GetEnumerator() => this.dictionary.GetEnumerator();

/// <summary>
/// Clears all entries from the dictionary and the per-key lock table.
/// Marked internal (not private) so that test code can clear the codegen cache
/// (e.g. EquiJoinStreamable.cachedPipes) to ensure deterministic test behavior
/// without relying on reflection.
/// </summary>
internal void Clear()
{
this.dictionary.Clear();
this.keyLocks.Clear();
}

IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0</TargetFrameworks>
<TargetFrameworks>net10.0</TargetFrameworks>
<Platforms>x64;AnyCPU</Platforms>
</PropertyGroup>

Expand All @@ -11,14 +11,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.CSharp" Version="4.5.0" />
<PackageReference Include="Microsoft.CodeAnalysis.Scripting" Version="3.1.0" />
<PackageReference Include="System.Diagnostics.Contracts" Version="4.3.0" />
<PackageReference Include="System.Diagnostics.Process" Version="4.3.0" />
<PackageReference Include="System.Linq.Expressions" Version="4.3.0" />
<PackageReference Include="System.Runtime.Serialization.Primitives" Version="4.3.0" />
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
<PackageReference Include="System.Runtime.Loader" Version="4.3.0" />
<PackageReference Include="Microsoft.CodeAnalysis.Scripting" Version="5.3.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ namespace Microsoft.StreamProcessing
{
internal sealed class EquiJoinStreamable<TKey, TLeft, TRight, TResult> : BinaryStreamable<TKey, TLeft, TRight, TResult>
{
private static readonly SafeConcurrentDictionary<Tuple<Type, string>> cachedPipes
= new SafeConcurrentDictionary<Tuple<Type, string>>();
// Internal (not private) so test code can call cachedPipes.Clear() to ensure
// deterministic behavior in tests that depend on a fresh codegen compile.
internal static readonly SafeConcurrentDictionary<Tuple<Type, string>> cachedPipes
= new SafeConcurrentDictionary<Tuple<Type, string>>();

private readonly JoinKind joinKind;
private readonly Func<CacheKey, Tuple<Type, string>> columnarGenerator;
Expand Down
3 changes: 3 additions & 0 deletions Sources/Core/Microsoft.StreamProcessing/Pipes/BinaryPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ public override void OnCompleted()
Monitor.Enter(this.sync);
try
{
// Process any batches that were enqueued while another thread held the lock above.
// Monitor is reentrant, so ProcessPendingBatches()'s TryEnter will succeed here.
ProcessPendingBatches();
base.OnCompleted();

while (this.leftQueue.TryDequeue(out var leftBatch))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Licensed under the MIT License
// *********************************************************************
using System;
using System.Buffers;
using System.IO;
using System.Runtime.Serialization;
using System.Text;
Expand Down Expand Up @@ -110,18 +111,15 @@ public ulong DecodeULong()

public float DecodeFloat()
{
var value = new byte[4];
Span<byte> value = stackalloc byte[4];
ReadAllRequiredBytes(value);
if (!BitConverter.IsLittleEndian)
{
Array.Reverse(value);
}
return BitConverter.ToSingle(value, 0);
if (!BitConverter.IsLittleEndian) value.Reverse();
return BitConverter.ToSingle(value);
}

public double DecodeDouble()
{
var value = new byte[8];
Span<byte> value = stackalloc byte[8];
ReadAllRequiredBytes(value);
long longValue = value[0]
| (long)value[1] << 0x8
Expand All @@ -142,7 +140,21 @@ public byte[] DecodeByteArray()
return array;
}

public string DecodeString() => Encoding.UTF8.GetString(DecodeByteArray());
public string DecodeString()
{
int byteCount = DecodeInt();
if (byteCount == 0) return string.Empty;
var rented = ArrayPool<byte>.Shared.Rent(byteCount);
try
{
ReadAllRequiredBytes(rented.AsSpan(0, byteCount));
return Encoding.UTF8.GetString(rented, 0, byteCount);
}
finally
{
ArrayPool<byte>.Shared.Return(rented);
}
}

public int DecodeArrayChunk()
{
Expand All @@ -157,9 +169,9 @@ public int DecodeArrayChunk()

public Guid DecodeGuid()
{
var array = new byte[16];
ReadAllRequiredBytes(array);
return new Guid(array);
Span<byte> value = stackalloc byte[16];
ReadAllRequiredBytes(value);
return new Guid(value);
}

private void ReadAllRequiredBytes(byte[] array)
Expand All @@ -172,15 +184,26 @@ private void ReadAllRequiredBytes(byte[] array)
}
}

private void ReadAllRequiredBytes(Span<byte> span)
{
int totalRead = 0;
while (totalRead < span.Length)
{
int read = this.stream.Read(span.Slice(totalRead));
if (read == 0)
throw new SerializationException($"Unexpected end of stream: '{span.Length - totalRead}' bytes missing.");
totalRead += read;
}
}

private int ReadIntFixed()
{
var value = new byte[4];
this.stream.ReadAllRequiredBytes(value, 0, value.Length);
int intValue = value[0]
Span<byte> value = stackalloc byte[4];
ReadAllRequiredBytes(value);
return value[0]
| value[1] << 0x8
| value[2] << 0x10
| value[3] << 0x18;
return intValue;
}

public unsafe T[] DecodeArray<T>() where T : struct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Licensed under the MIT License
// *********************************************************************
using System;
using System.Buffers;
using System.IO;
using System.Text;
using Microsoft.StreamProcessing.Internal;
Expand Down Expand Up @@ -83,13 +84,10 @@ public void Encode(ulong value)

public void Encode(float value)
{
byte[] bytes = BitConverter.GetBytes(value);
if (!BitConverter.IsLittleEndian)
{
Array.Reverse(bytes);
}

this.stream.Write(bytes, 0, bytes.Length);
Span<byte> bytes = stackalloc byte[4];
BitConverter.TryWriteBytes(bytes, value);
if (!BitConverter.IsLittleEndian) bytes.Reverse();
this.stream.Write(bytes);
}

public void Encode(double value)
Expand All @@ -113,12 +111,40 @@ public void Encode(byte[] value)
if (value.Length > 0) this.stream.Write(value, 0, value.Length);
}

public void Encode(ReadOnlySpan<byte> span)
{
int count = span.Length;
Encode(count);
if (count > 0)
{
this.stream.Write(span);
}
}

public void Encode(string value)
=> Encode(Encoding.UTF8.GetBytes(value ?? throw new ArgumentNullException(nameof(value))));
{
if (value == null) throw new ArgumentNullException(nameof(value));
int byteCount = Encoding.UTF8.GetByteCount(value);
var rented = ArrayPool<byte>.Shared.Rent(byteCount);
try
{
int written = Encoding.UTF8.GetBytes(value, rented);
Encode(rented.AsSpan(0, written));
}
finally
{
ArrayPool<byte>.Shared.Return(rented);
}
}

public void EncodeArrayChunk(int size) => Encode(size);

public void Encode(Guid value) => this.stream.Write(value.ToByteArray(), 0, 16);
public void Encode(Guid value)
{
Span<byte> bytes = stackalloc byte[16];
value.TryWriteBytes(bytes);
this.stream.Write(bytes);
}

private void WriteIntFixed(int encodedValue)
{
Expand Down
14 changes: 10 additions & 4 deletions Sources/Core/Microsoft.StreamProcessing/Utilities/Config.cs
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,12 @@ public static string Describe()
// in VSTest, which is a good thing, since Config is static and those tests may clash otherwise.
internal sealed class ConfigModifier
{
// lockable gate allowing only one ConfigModifier active at a time
private static readonly object gate = new object();
// Serializes concurrent ConfigModifier usage across tests.
// SemaphoreSlim instead of Monitor so that async tests can release from a different thread.
// AsyncLocal depth counter makes it re-entrant within the same async call context (e.g. nested
// using blocks within a single test) without blocking on the semaphore a second time.
private static readonly SemaphoreSlim gate = new SemaphoreSlim(1, 1);
private static readonly AsyncLocal<int> gateDepth = new AsyncLocal<int>();

// collection of Config modifications
private readonly List<IGatedModification> modifications = new List<IGatedModification>();
Expand Down Expand Up @@ -723,7 +727,8 @@ public ConfigModifier SerializationCompressionLevel(SerializationCompressionLeve

public IDisposable Modify()
{
Monitor.Enter(gate);
if (gateDepth.Value == 0) gate.Wait();
gateDepth.Value++;
foreach (var m in this.modifications)
m.Modify();

Expand All @@ -732,7 +737,8 @@ public IDisposable Modify()
foreach (var m in this.modifications)
m.Modify();

Monitor.Exit(gate);
gateDepth.Value--;
if (gateDepth.Value == 0) gate.Release();
});
}

Expand Down
12 changes: 10 additions & 2 deletions Sources/Core/Microsoft.StreamProcessing/Utilities/Native32.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,16 @@ internal static void AffinitizeThread(int processor)
{
if (utid == pt.Id)
{
long AffinityMask = 1 << processor;
pt.ProcessorAffinity = (IntPtr)(AffinityMask); // Set affinity for this
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
{
pt.IdealProcessor = processor;
}

if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
long AffinityMask = 1 << processor;
pt.ProcessorAffinity = (IntPtr)(AffinityMask); // Set affinity for this
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/Test/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="StyleCop.Analyzers" Version="1.0.2" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.118" />
</ItemGroup>

</Project>
2 changes: 1 addition & 1 deletion Sources/Test/Microsoft.StreamProcessing.Test.ruleset
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
<Include Path="..\Microsoft.StreamProcessing.ruleset" Action="Default" />

<Rules AnalyzerId="StyleCop.Analyzers" RuleNamespace="StyleCop.Analyzers">
<!-- Override any rules here -->
<Rule Id="SA1137" Action="None" /> <!-- Generated T4 files do not guarantee consistent indentation -->
</Rules>
</RuleSet>
Loading