In the consumer-producer pattern, I want to have a single consumer under which I can define multiple optional tasks. I will store these tasks in a dictionary and execute them later within the consumer. To implement this example I will use the architecture from Steven van Deursen and channels.
In short, my consumer looks like this:
public class ReadAllHandlerAsync(
Dictionary<IListOfTasksToExecuteInReader, IListOfTasksToExecuteInReaderCommand> listOfTasksToExecuteInReadAllAsync)
: IListOfTasksToExecute
{
public async Task Execute(IListOfTasksToExecuteCommand command)
{
var reader = command.MyChannelReader;
if (reader is not null)
{
await foreach (var latLngFileName in reader.ReadAllAsync())
{
foreach (var taskToExecuteCommand in listOfTasksToExecuteInReadAllAsync)
{
try
{
taskToExecuteCommand.Value.LatLngFileName = latLngFileName;
await taskToExecuteCommand.Key.Execute(taskToExecuteCommand.Value);
}
catch (Exception ex)
{
((ReadAllAsyncHandlerCommand)command).Exceptions.Enqueue(ex);
}
}
}
}
}
}
Then I have created wrapper around my Parallel.ForEachAsync, since therotically speaking I could have also more consumers, I will have also list of taks in my wrapper which I can execute before starting Parallel.ForEachAsync, and where I will hand over the channel:
public class MyParallelForEachAsyncWrapper(MyParallelForEachAsync myParallelForEachAsync
, Dictionary<IListOfTasksToExecute, IListOfTasksToExecuteCommand> listOfTasksToExecuteBeforeStartForEach) : ICommandHandlerAsync<MyParallelForEachAsyncWrapperCommand>
{
public async Task Execute(MyParallelForEachAsyncWrapperCommand command)
{
var tasksToExecuteBeforeStartForEach = new List<Task>();
try
{
foreach (var taskToExecuteBeforeStartForEach in listOfTasksToExecuteBeforeStartForEach)
{
if (command.MyInfoChannel != null)
{
taskToExecuteBeforeStartForEach.Value.MyInfoChannelReader = command.MyInfoChannel;
}
tasksToExecuteBeforeStartForEach.Add(
taskToExecuteBeforeStartForEach.Key.Execute(taskToExecuteBeforeStartForEach.Value));
}
var myParallelForEachAsyncCommand = new MyParallelForEachAsyncCommand
{
FolderName = command.FolderName
, MyChannel = command.MyChannel
};
await myParallelForEachAsync.Execute(myParallelForEachAsyncCommand);
}
catch (Exception e)
{
command.Exceptions.Enqueue(e);
}
finally
{
await Task.WhenAll(tasksToExecuteBeforeStartForEach);
}
}
}
Notice how I am handing over the channel:
if (command.MyInfoChannel != null)
{
taskToExecuteBeforeStartForEach.Value.MyInfoChannelReader = command.MyInfoChannel;
}
and
var myParallelForEachAsyncCommand = new MyParallelForEachAsyncCommand
{
FolderName = command.FolderName
, MyChannel = command.MyChannel
};
await myParallelForEachAsync.Execute(myParallelForEachAsyncCommand);
Here is a small example of how to display the type and size of fields from a schema table. This example works for MS SQL; I haven't tested it on other databases.
using System;
using System.Data.SqlClient;
using System.Windows.Forms;
namespace GetDbDataTypes
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private void btnStart_Click(object sender, EventArgs e)
{
string connectionString = "Server=myServer;Database=myDb;Integrated Security=True";
string query = "SELECT top 1 * FROM [myTable]";
using (SqlConnection connection = new SqlConnection(connectionString))
{
connection.Open();
using (SqlCommand command = new SqlCommand(query, connection))
{
command.CommandTimeout = 3600;
using (SqlDataReader reader = command.ExecuteReader())
{
dataGridView1.DataSource = reader.GetSchemaTable();
}
}
}
}
}
}