Skip to content

[Feature]: MaximumBy #954

@fubar-coder

Description

@fubar-coder

Describe the functionality desired 🐞

The library has a Maximum operator, but no MaximumBy, which allows the aggregation of a TObject using a key selector.

The steps the functionality will provide

Example

Usage

var data = new SourceList<TestItem>();

var output = new List<string>();
using var _ = data.Connect()
    .AutoRefresh()
    .MaximumBy(x => x.Name)
    .Subscribe(x =>
    {
        output.Add(x.Name);
        Console.WriteLine(x.Name);
    });

var item1 = new TestItem("b");

data.Add(item1);
data.Add(new TestItem("a"));

Debug.Assert(output.Count == 2);
Debug.Assert(output[0] == "b");
Debug.Assert(output[1] == "b");

data.Add(new TestItem("c"));
Debug.Assert(output.Count == 3);
Debug.Assert(output[2] == "c");

item1.Name = "d";
Debug.Assert(output.Count == 4);
Debug.Assert(output[3] == "d");

Test object

public class TestItem(string name) : INotifyPropertyChanged
{
    private string _name = name;

    public event PropertyChangedEventHandler? PropertyChanged;

    public string Name
    {
        get => _name;
        set => SetField(ref _name, value);
    }

    protected virtual void OnPropertyChanged([CallerMemberName] string? propertyName = null)
    {
        PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(propertyName));
    }

    protected bool SetField<T>(ref T field, T value, [CallerMemberName] string? propertyName = null)
    {
        if (EqualityComparer<T>.Default.Equals(field, value)) return false;
        field = value;
        OnPropertyChanged(propertyName);
        return true;
    }
}

Extension class

/// <summary>
/// Methods to extend the DynamicData library.
/// </summary>
public static class DynamicDataExtensions
{
    /// <summary>
    /// Performs a maximum operation on the source.
    /// </summary>
    /// <param name="source">The source.</param>
    /// <param name="keySelector">The key selector for the object to get the maximum for.</param>
    /// <typeparam name="TObject">The object type to get the maximum for.</typeparam>
    /// <typeparam name="TKey">The key selector to determine the maximum.</typeparam>
    /// <returns>The observable returning the maximum object.</returns>
    [MethodImpl(MethodImplOptions.AggressiveInlining)]
    public static IObservable<TObject> MaximumBy<TObject, TKey>(
        this IObservable<IChangeSet<TObject>> source,
        Func<TObject, TKey> keySelector)
        where TObject : notnull
    {
        return source.MaximumBy(keySelector, null);
    }

    /// <summary>
    /// Alternative implementation of the <see cref="MaxEx.Maximum{TObject,TResult}"/> method, which
    /// uses a comparer for the object itself.
    /// </summary>
    /// <param name="source">The source.</param>
    /// <param name="comparer">The comparer for the source object.</param>
    /// <typeparam name="TObject">The object type to get the maximum for.</typeparam>
    /// <returns>The observable returning the maximum object.</returns>
    [MethodImpl(MethodImplOptions.AggressiveInlining)]
    public static IObservable<TObject> Maximum<TObject>(
        this IObservable<IChangeSet<TObject>> source,
        IComparer<TObject>? comparer)
        where TObject : notnull
    {
        return source.MaximumBy(static x => x, comparer);
    }

    /// <summary>
    /// Performs a maximum operation on the source using both a key selector and comparer.
    /// </summary>
    /// <param name="source">The source.</param>
    /// <param name="keySelector">The key selector for the object to get the maximum for.</param>
    /// <param name="comparer">The comparer for the source object.</param>
    /// <typeparam name="TObject">The object type to get the maximum for.</typeparam>
    /// <typeparam name="TKey">The key selector to determine the maximum.</typeparam>
    /// <returns>The observable returning the maximum object.</returns>
    public static IObservable<TObject> MaximumBy<TObject, TKey>(
        this IObservable<IChangeSet<TObject>> source,
        Func<TObject, TKey> keySelector,
        IComparer<TKey>? comparer)
        where TObject : notnull
    {
        comparer ??= Comparer<TKey>.Default;
        return source
            .ToChangesAndCollection()
            .Scan(
                AggregationState<TObject, TKey>.Empty,
                (state, latest) =>
                {
                    var current = state;
                    var requiresReset = false;

                    using var changesEnumerator = latest.Changes.GetEnumerator();
                    if (changesEnumerator.MoveNext())
                    {
                        do
                        {
                            var change = changesEnumerator.Current;
                            var value = keySelector(change.Item);

                            if (change.Type == AggregateType.Add)
                            {
                                // Set a new current item, when the current item was not set,
                                // or if the comparer indicates a new maximum value.
                                if (!current.IsSet || comparer.Compare(value, current.Key) > 0)
                                {
                                    current = new AggregationState<TObject, TKey>(change.Item, value);
                                }
                            }
                            else
                            {
                                // check whether the max / min has been removed. If so we need to look
                                // up the latest from the underlying collection
                                if (!current.IsSet)
                                {
                                    // Value was not set, but we have a remove change. This means that the
                                    // something went wrong???
                                    continue;
                                }

                                if (comparer.Compare(value, current.Key) != 0)
                                {
                                    continue;
                                }

                                requiresReset = true;
                                break;
                            }
                        } while (changesEnumerator.MoveNext());
                    }
                    else
                    {
                        // An empty enumeration occurs when <c>AutoRefresh</c> is used and the
                        // underlying item has been changed.
                        requiresReset = true;
                    }

                    // Do we need to process the whole collection?
                    // This may happen due to a refresh or a removal of the max item.
                    if (requiresReset)
                    {
                        var collection = latest.Collection;
                        if (collection.Count == 0)
                        {
                            current = AggregationState<TObject, TKey>.Empty;
                        }
                        else
                        {
                            var found = collection.MaxBy(keySelector);
                            current = found is null
                                ? AggregationState<TObject, TKey>.Empty
                                : new AggregationState<TObject, TKey>(found, keySelector(found));
                        }
                    }

                    return current;
                })
            .Where(x => x.IsSet)
            .Select(x => x.Current!);
    }

    /// <summary>
    /// Copied from the original DynamicData source code.
    /// </summary>
    /// <param name="source"></param>
    /// <typeparam name="TObject"></typeparam>
    /// <returns></returns>
    private static IObservable<ChangesAndCollection<TObject>> ToChangesAndCollection<TObject>(
        this IObservable<IChangeSet<TObject>> source)
        where TObject : notnull
    {
        return source.Publish(
            shared =>
            {
                var changes = shared.ForAggregation();
                var data = shared.ToCollection();
                return data.Zip(changes, (d, c) => new ChangesAndCollection<TObject>(c, d));
            });
    }

    /// <summary>
    /// Contains the current maximum value and its key.
    /// </summary>
    /// <typeparam name="TObject"></typeparam>
    /// <typeparam name="TKey"></typeparam>
    private sealed class AggregationState<TObject, TKey>
        where TObject : notnull
    {
        /// <summary>
        /// Gets an empty instance of the <see cref="AggregationState{TObject, TKey}"/> class.
        /// </summary>
        public static AggregationState<TObject, TKey> Empty { get; } = new();

        /// <summary>
        /// Initializes a new instance of the <see cref="AggregationState{TObject, TKey}"/> class.
        /// </summary>
        private AggregationState()
            : this(false, default, default)
        {
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="AggregationState{TObject, TKey}"/> class.
        /// </summary>
        /// <param name="current">The current value.</param>
        /// <param name="key">The key of the current value.</param>
        public AggregationState(TObject current, TKey key)
            : this(true, current, key)
        {
        }

        /// <summary>
        /// Initializes a new instance of the <see cref="AggregationState{TObject, TKey}"/> class.
        /// </summary>
        /// <param name="isSet">Determines whether the current value is set.</param>
        /// <param name="current">The current value.</param>
        /// <param name="key">The key of the current value.</param>
        private AggregationState(bool isSet, TObject? current, TKey? key)
        {
            IsSet = isSet;
            Current = current;
            Key = key;
        }

        /// <summary>
        /// Gets a value indicating whether this instance is set to a non-null value and key.
        /// </summary>
        [MemberNotNullWhen(true, nameof(Current), nameof(Key))]
        public bool IsSet { get; }

        /// <summary>
        /// Gets the value of the current maximum item.
        /// </summary>
        public TObject? Current { get; }

        /// <summary>
        /// Gets the key of the current maximum item.
        /// </summary>
        public TKey? Key { get; }
    }

    /// <summary>
    /// Copied from the original DynamicData source code.
    /// </summary>
    /// <param name="changes"></param>
    /// <param name="collection"></param>
    /// <typeparam name="T"></typeparam>
    private sealed class ChangesAndCollection<T>(IAggregateChangeSet<T> changes, IReadOnlyCollection<T> collection)
    {
        public IAggregateChangeSet<T> Changes => changes;

        public IReadOnlyCollection<T> Collection => collection;
    }
}

Considerations

I was looking at the Maximum implementation, but ran into two problems:

  1. I was unable to specify a comparer
  2. I was unable to get the maximum based on a property of the source object

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions