Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 70 additions & 62 deletions src/Microsoft.ML/LearningPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using static Microsoft.ML.Runtime.DefaultEnvironment;

namespace Microsoft.ML
{
Expand Down Expand Up @@ -48,15 +49,26 @@ public ScorerPipelineStep(Var<IDataView> data, Var<ITransformModel> model)
[DebuggerTypeProxy(typeof(LearningPipelineDebugProxy))]
public class LearningPipeline : ICollection<ILearningPipelineItem>
{
readonly internal IHostEnvironment Env;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Env [](start = 43, length = 3)

HostEnvironmentBase just happens to be IDisposable.
Since this design makes the pipeline own an instance of Env, your have to make the class disposable,
Alternatively, you can take an optional environment and then the problem of owning env becomse the callers problem.
If a user has to pass an environment to this class there are several potental benefits:

  1. They can create custom, different environments and customize other things such as logging
  2. That can share environments between different experiments
  3. If the pipeline is hosted within a macro, such as CV or Sweep, then the partent container of Pipeline can own the overall env.

private List<ILearningPipelineItem> Items { get; } = new List<ILearningPipelineItem>();

/// <summary>
/// Construct an empty <see cref="LearningPipeline"/> object.
/// </summary>
public LearningPipeline()
public LearningPipeline(int? seed = null, int concurrency = 0)
Copy link
Contributor

@TomFinley TomFinley May 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LearningPipeline(int? seed = null, int concurrency = 0) [](start = 15, length = 55)

Hey Ivan, thanks much for addressing this!

Seed and concurrency are fine... but in other contexts we've also talked about the need to change the output and error text readers, plus of course the progress indicators. :)

The reason I mention this is: whatever solution you come up with that actually captures all these important things will, I am quite certain, wind up being almost completely isomorphic to IHostEnvironment. Let's save ourselves a little bit of time, deal with the controversy now, and just make the facilities for solving this problem an IHostEnvironment instance. If need be we can introduce one or two conveniences, I don't object, but let's cross that bridge now. :P What do you think?

Copy link
Contributor Author

@Ivanidzo4ka Ivanidzo4ka May 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be confusing for user to have method Register, StartProgressChannel and so on for LearningPipeline class?
Two events, one for normal message(string + kind), one for progress, so user can subscribe for them and decide what to do with them. Seed, concurrency in constructor looks more than enough from my abstract user perspective.


In reply to: 187756624 [](ancestors = 187756624)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I second Tom's sentiment. Why not just take an optional IHostEnvironment argument? There are several benefits in addition to support of the needed functionality:

  1. Ability to control experiment's output
  2. Ability to host more than one concurrent experiment and be able to separate outputs from each. (imagine a service runs multiple traning experiments. It would allow user to see output of a single experiment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you have that ability here :) just subscribe to event, and handle messages as you like.

I'm actually not sure how its possible with current API and how IHostEnvironment argument will help you in that.


In reply to: 187758015 [](ancestors = 187758015)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and btw Tom suggest to make LearningPipeline:IHostEnvironment and you suggest LearningPipeline(IHostEnvironment env) :)


In reply to: 187758499 [](ancestors = 187758499,187758015)

Copy link
Contributor

@veikkoeeva veikkoeeva May 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if MachineLearning.Net used what's baked in the framework more or less (like MEL), Ivan could configure https://github.com/serilog/serilog-sinks-console to log into console. Or to file, or both, right now. Or use Log4net and do whatever he does there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@veikkoeeva ... sure, and if we want to come up with an implementation of IHostEnvironment that does this, that's great. Baking that into the interface itself though would be a mistake, because that would basically make IHostEnvironment a logging framework, and it isn't one. But just having yet another implementation of IHostEnvironment, perhaps a small modification of Ivan's work, would allow you to do what you propose. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomFinley I have a hunch that the implementation of IHostingEnvironment could be any DI framework out there and it'd be a case of refactoring ML.NET a bit and then people would bring their own (or use what's included if they don't need anything sophisticated, like in ASP.NET Core). I may be wrong and hence hesistant to make absolute claims. It also may make time before I can state anything in absolute terms, as in takes time to work with code and understand it better to propose concrete, well defined action, though maybe role could be to just ask the rationale for current systems to suss out improvements.

For the sake of discussion, one way of seeing a DI framework is that it's a container to which one composes the object graph – as in a tree rooted in the container. Then it allows one to switch parts with given parameters: single instance ever resolved, per-thread instances, per-call instances, instances parametrized in some other way. May seem complicated as written like this, but isn't. One example here: http://autofaccn.readthedocs.io/en/latest/getting-started/#application-startup. For ASP.NET Core, for instance, it's explained at https://docs.microsoft.com/en-us/aspnet/core/fundamentals/dependency-injection?view=aspnetcore-2.0.

The benefit is that the object graph is clearly defined. Further, if one makes it so that injection happens via constructors only and the data injected is as much as readonly as possible, the invariant can be checked in the constructor. Going further, if the functions work so that all arguments are via parameter passing and all output vie return values, the system should be rather easy to refactor, test, difficult to break and likely easier to extend. It's what I wrote at #168 (comment) too. I also saw #208 touch upon this also.

The current system has some of these benefits, but in unfamiliar form, and mightn't be as flexible as it could with DI. Points:

  1. This arrangement might allow easier testing (also all kinds of extensions), especially with property based testing tools like IntelliTest, or, say, FsCheck.
  2. Likely easier to test calculations and whatnot in parts and in isolation. Performance, correctness, what have you. One point to note is floating point determinisnim that might become an issue.
  3. Composing the pipelines with the aforementioned way might make it easier to compose with F#, i.e. algebraic types/domain modelling too, but I'm not well versed enough in F# to tell (maybe @isaacabraham has an opinion). The aforementioned could work well with function composition too (without going into partial application over the whole program state as often happens in C# when with inheritance hierarchies results go out from the inheritors, to exaggerate a bit).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TomFinley To answer directly to

But just having yet another implementation of IHostEnvironment, perhaps a small modification of Ivan's work, would allow you to do what you propose. What do you think?

Maybe you have an opinion already since you ask an opinion to a small modification to Ivan's work?

Copy link
Contributor

@TomFinley TomFinley May 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @veikkoeeva , sure. Put another way? Does that solution make people happy or unhappy that want this thing to plug into their favorite logging framework, whatever that may be? If unhappy, maybe an alternate specific solution should be proposed.

I think dependency injection is another kettle of fish. You mentioned MEF. That might be a good thing to use, I just don't know. It certainly has many of the features the component catalog needs. It would be nice if there could be a replacement. Whether it's suitable though would be an investigation.

{

var env = new DefaultEnvironment(seed: seed, conc: concurrency);
env.MessageRecieved += Env_MessageRecieved;
Env = env;
}

private void Env_MessageRecieved(object sender, ChannelMessageEventArgs e)
{
MessageOccured?.Invoke(this, e);
}

public event EventHandler<ChannelMessageEventArgs> MessageOccured;
Copy link
Contributor

@TomFinley TomFinley May 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good example of how this class is morphing to become host environment like. We create a facility identical to that of the environment you create, but we choose to rename it from MessageReceived to MessageOccured. I'm not sure this wrapping is any more clear than if we just used the same abstraction everywhere in the codebase.

/// <summary>
/// Get the count of ML components in the <see cref="LearningPipeline"/> object
/// </summary>
Expand Down Expand Up @@ -137,80 +149,76 @@ public PredictionModel<TInput, TOutput> Train<TInput, TOutput>()
where TInput : class
where TOutput : class, new()
{
Experiment experiment = Env.CreateExperiment();
ILearningPipelineStep step = null;
List<ILearningPipelineLoader> loaders = new List<ILearningPipelineLoader>();
List<Var<ITransformModel>> transformModels = new List<Var<ITransformModel>>();
Var<ITransformModel> lastTransformModel = null;

using (var environment = new TlcEnvironment())
foreach (ILearningPipelineItem currentItem in this)
{
Experiment experiment = environment.CreateExperiment();
ILearningPipelineStep step = null;
List<ILearningPipelineLoader> loaders = new List<ILearningPipelineLoader>();
List<Var<ITransformModel>> transformModels = new List<Var<ITransformModel>>();
Var<ITransformModel> lastTransformModel = null;
if (currentItem is ILearningPipelineLoader loader)
loaders.Add(loader);

step = currentItem.ApplyStep(step, experiment);
if (step is ILearningPipelineDataStep dataStep && dataStep.Model != null)
transformModels.Add(dataStep.Model);

foreach (ILearningPipelineItem currentItem in this)
else if (step is ILearningPipelinePredictorStep predictorDataStep)
{
if (currentItem is ILearningPipelineLoader loader)
loaders.Add(loader);

step = currentItem.ApplyStep(step, experiment);
if (step is ILearningPipelineDataStep dataStep && dataStep.Model != null)
transformModels.Add(dataStep.Model);

else if (step is ILearningPipelinePredictorStep predictorDataStep)
if (lastTransformModel != null)
transformModels.Insert(0, lastTransformModel);

var localModelInput = new Transforms.ManyHeterogeneousModelCombiner
{
if (lastTransformModel != null)
transformModels.Insert(0, lastTransformModel);

var localModelInput = new Transforms.ManyHeterogeneousModelCombiner
{
PredictorModel = predictorDataStep.Model,
TransformModels = new ArrayVar<ITransformModel>(transformModels.ToArray())
};

var localModelOutput = experiment.Add(localModelInput);

var scorer = new Transforms.Scorer
{
PredictorModel = localModelOutput.PredictorModel
};

var scorerOutput = experiment.Add(scorer);
lastTransformModel = scorerOutput.ScoringTransform;
step = new ScorerPipelineStep(scorerOutput.ScoredData, scorerOutput.ScoringTransform);
transformModels.Clear();
}
}
PredictorModel = predictorDataStep.Model,
TransformModels = new ArrayVar<ITransformModel>(transformModels.ToArray())
};

if (transformModels.Count > 0)
{
transformModels.Insert(0,lastTransformModel);
var modelInput = new Transforms.ModelCombiner
var localModelOutput = experiment.Add(localModelInput);

var scorer = new Transforms.Scorer
{
Models = new ArrayVar<ITransformModel>(transformModels.ToArray())
PredictorModel = localModelOutput.PredictorModel
};

var modelOutput = experiment.Add(modelInput);
lastTransformModel = modelOutput.OutputModel;
var scorerOutput = experiment.Add(scorer);
lastTransformModel = scorerOutput.ScoringTransform;
step = new ScorerPipelineStep(scorerOutput.ScoredData, scorerOutput.ScoringTransform);
transformModels.Clear();
}
}

experiment.Compile();
foreach (ILearningPipelineLoader loader in loaders)
if (transformModels.Count > 0)
{
transformModels.Insert(0, lastTransformModel);
var modelInput = new Transforms.ModelCombiner
{
loader.SetInput(environment, experiment);
}
experiment.Run();
Models = new ArrayVar<ITransformModel>(transformModels.ToArray())
};

ITransformModel model = experiment.GetOutput(lastTransformModel);
BatchPredictionEngine<TInput, TOutput> predictor;
using (var memoryStream = new MemoryStream())
{
model.Save(environment, memoryStream);
var modelOutput = experiment.Add(modelInput);
lastTransformModel = modelOutput.OutputModel;
}

memoryStream.Position = 0;
experiment.Compile();
foreach (ILearningPipelineLoader loader in loaders)
{
loader.SetInput(Env, experiment);
}
experiment.Run();

predictor = environment.CreateBatchPredictionEngine<TInput, TOutput>(memoryStream);
ITransformModel model = experiment.GetOutput(lastTransformModel);
BatchPredictionEngine<TInput, TOutput> predictor;
using (var memoryStream = new MemoryStream())
{
model.Save(Env, memoryStream);

return new PredictionModel<TInput, TOutput>(predictor, memoryStream);
}
memoryStream.Position = 0;

predictor = Env.CreateBatchPredictionEngine<TInput, TOutput>(memoryStream);

return new PredictionModel<TInput, TOutput>(predictor, memoryStream);
}
}

Expand All @@ -220,9 +228,9 @@ public PredictionModel<TInput, TOutput> Train<TInput, TOutput>()
/// <returns>
/// The IDataView that was returned by the pipeline.
/// </returns>
internal IDataView Execute(IHostEnvironment environment)
internal IDataView Execute()
{
Experiment experiment = environment.CreateExperiment();
Experiment experiment = Env.CreateExperiment();
ILearningPipelineStep step = null;
List<ILearningPipelineLoader> loaders = new List<ILearningPipelineLoader>();
foreach (ILearningPipelineItem currentItem in this)
Expand All @@ -241,7 +249,7 @@ internal IDataView Execute(IHostEnvironment environment)
experiment.Compile();
foreach (ILearningPipelineLoader loader in loaders)
{
loader.SetInput(environment, experiment);
loader.SetInput(Env, experiment);
}
experiment.Run();

Expand Down
10 changes: 5 additions & 5 deletions src/Microsoft.ML/LearningPipelineDebugProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using Microsoft.ML.Runtime;
using Microsoft.ML.Runtime.Data;
using Microsoft.ML.Transforms;
using System;
Expand All @@ -25,7 +26,7 @@ internal sealed class LearningPipelineDebugProxy
private const int MaxSlotNamesToDisplay = 100;

private readonly LearningPipeline _pipeline;
private readonly TlcEnvironment _environment;
private readonly IHostEnvironment _environment;
private IDataView _preview;
private Exception _pipelineExecutionException;
private PipelineItemDebugColumn[] _columns;
Expand All @@ -36,10 +37,9 @@ public LearningPipelineDebugProxy(LearningPipeline pipeline)
if (pipeline == null)
throw new ArgumentNullException(nameof(pipeline));

_pipeline = new LearningPipeline();
_pipeline = new LearningPipeline(seed:42, concurrency:1);

// use a ConcurrencyFactor of 1 so other threads don't need to run in the debugger
_environment = new TlcEnvironment(conc: 1);
_environment = _pipeline.Env;

foreach (ILearningPipelineItem item in pipeline)
{
Expand Down Expand Up @@ -139,7 +139,7 @@ private IDataView ExecutePipeline()
{
try
{
_preview = _pipeline.Execute(_environment);
_preview = _pipeline.Execute();
}
catch (Exception e)
{
Expand Down
96 changes: 96 additions & 0 deletions src/Microsoft.ML/Runtime/DefaultEnvironment.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
using Microsoft.ML.Runtime.Data;
using System;

namespace Microsoft.ML.Runtime
{
public sealed class DefaultEnvironment : HostEnvironmentBase<DefaultEnvironment>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why this type is public?

{
public DefaultEnvironment(int? seed = null, int conc = 0)
: this(RandomUtils.Create(seed), true, conc)
{
}

public DefaultEnvironment(IRandom rand, bool verbose, int conc, string shortName = null, string parentFullName = null) : base(rand, verbose, conc, shortName, parentFullName)
{
EnsureDispatcher<ChannelMessage>();
AddListener<ChannelMessage>(OnMessageRecieved);
}

void OnMessageRecieved(IMessageSource sender, ChannelMessage msg)
{
ChannelMessageEventArgs eventArgs = new ChannelMessageEventArgs() { Message = msg };
MessageRecieved?.Invoke(this, eventArgs);
}

public event EventHandler<ChannelMessageEventArgs> MessageRecieved;
public class ChannelMessageEventArgs : EventArgs
{
public ChannelMessage Message { get; set; }
}

private sealed class Channel : ChannelBase
{
public Channel(DefaultEnvironment master, ChannelProviderBase parent, string shortName, Action<IMessageSource, ChannelMessage> dispatch)
: base(master, parent, shortName, dispatch)
{
}
}

private sealed class Host : HostBase
{
public new bool IsCancelled => Root.IsCancelled;

public Host(HostEnvironmentBase<DefaultEnvironment> source, string shortName, string parentFullName, IRandom rand, bool verbose, int? conc)
: base(source, shortName, parentFullName, rand, verbose, conc)
{
}

protected override IChannel CreateCommChannel(ChannelProviderBase parent, string name)
{
Contracts.AssertValue(parent);
Contracts.Assert(parent is Host);
Contracts.AssertNonEmpty(name);
return new Channel(Root, parent, name, GetDispatchDelegate<ChannelMessage>());
}

protected override IPipe<TMessage> CreatePipe<TMessage>(ChannelProviderBase parent, string name)
{
Contracts.AssertValue(parent);
Contracts.Assert(parent is Host);
Contracts.AssertNonEmpty(name);
return new Pipe<TMessage>(parent, name, GetDispatchDelegate<TMessage>());
}

protected override IHost RegisterCore(HostEnvironmentBase<DefaultEnvironment> source, string shortName, string parentFullName, IRandom rand, bool verbose, int? conc)
{
return new Host(source, shortName, parentFullName, rand, verbose, conc);
}
}

protected override IHost RegisterCore(HostEnvironmentBase<DefaultEnvironment> source, string shortName, string parentFullName, IRandom rand, bool verbose, int? conc)
{
Contracts.AssertValue(rand);
Contracts.AssertValueOrNull(parentFullName);
Contracts.AssertNonEmpty(shortName);
Contracts.Assert(source == this || source is Host);
return new Host(source, shortName, parentFullName, rand, verbose, conc);
}

protected override IChannel CreateCommChannel(ChannelProviderBase parent, string name)
{
Contracts.AssertValue(parent);
Contracts.Assert(parent is DefaultEnvironment);
Contracts.AssertNonEmpty(name);
return new Channel(this, parent, name, GetDispatchDelegate<ChannelMessage>());
}

protected override IPipe<TMessage> CreatePipe<TMessage>(ChannelProviderBase parent, string name)
{
Contracts.AssertValue(parent);
Contracts.Assert(parent is DefaultEnvironment);
Contracts.AssertNonEmpty(name);
return new Pipe<TMessage>(parent, name, GetDispatchDelegate<TMessage>());
}
}

}
2 changes: 1 addition & 1 deletion test/Microsoft.ML.TestFramework/ModelHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Microsoft.ML.TestFramework
{
public static class ModelHelper
{
private static TlcEnvironment s_environment = new TlcEnvironment(seed: 1);
private static TlcEnvironment s_environment = new TlcEnvironment(seed: 1, conc: 1);
private static ITransformModel s_housePriceModel;

public static void WriteKcHousePriceModel(string dataPath, string outputModelPath)
Expand Down
8 changes: 7 additions & 1 deletion test/Microsoft.ML.Tests/LearningPipelineTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using Microsoft.ML;
using Microsoft.ML.TestFramework;
using System;
using System.Linq;
using Xunit;
using Xunit.Abstractions;
Expand All @@ -22,12 +23,15 @@ public LearningPipelineTests(ITestOutputHelper output)
public void ConstructorDoesntThrow()
{
Assert.NotNull(new LearningPipeline());
Assert.NotNull(new LearningPipeline(seed:42));
Assert.NotNull(new LearningPipeline(concurrency: 1));
Assert.NotNull(new LearningPipeline(seed:42, concurrency: 1));
}

[Fact]
public void CanAddAndRemoveFromPipeline()
{
var pipeline = new LearningPipeline()
var pipeline = new LearningPipeline(seed:42, concurrency: 1)
{
new Transforms.CategoricalOneHotVectorizer("String1", "String2"),
new Transforms.ColumnConcatenator(outputColumn: "Features", "String1", "String2", "Number1", "Number2"),
Expand All @@ -42,5 +46,7 @@ public void CanAddAndRemoveFromPipeline()
pipeline.Add(new Trainers.StochasticDualCoordinateAscentRegressor());
Assert.Equal(3, pipeline.Count);
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public void TrainAndPredictHousePriceModelTest()
{
string dataPath = GetDataPath("kc_house_data.csv");

var pipeline = new LearningPipeline();
var pipeline = new LearningPipeline(seed: 42, concurrency : 1);

pipeline.Add(new TextLoader<HousePriceData>(dataPath, useHeader: true, separator: ","));

Expand Down
Loading