坚持学习WF(14):自定义持久化服务

系统 1580 0

[置顶]坚持学习WF文章索引

我们除了使用WF提供的SqlWorkflowPersistenceService外,还可以自定义持久化服务。因为有的时候你可能不想使用Sql Server数据库,我们就可以通过自定义持久化服务来使用其他的数据库,文件等来进行持久化存储。

一:1.1 我们先看一个MSDN中的例子,当从内存中卸载工作流时,工作流运行时可使用该服务将工作流实例状态保存到文件。该持久服务类代码如下FilePersistence.cs:
    public class FilePersistenceService : WorkflowPersistenceService
    {
        public readonly static TimeSpan MaxInterval = new TimeSpan(30, 0, 0, 0);

        private bool unloadOnIdle = false ;
        private Dictionary<Guid,Timer> instanceTimers;

        public FilePersistenceService( bool unloadOnIdle)
        {
            this .unloadOnIdle = unloadOnIdle;
            this .instanceTimers = new Dictionary<Guid, Timer>();
        }

        protected override void SaveWorkflowInstanceState(Activity rootActivity, bool unlock)
        {
            // Save the workflow
            Guid contextGuid = (Guid)rootActivity.GetValue(Activity.ActivityContextGuidProperty);
            Console.WriteLine(" Saving instance: {0}\n ", contextGuid);
            SerializeToFile( WorkflowPersistenceService.GetDefaultSerializedForm(rootActivity), contextGuid);

            // See when the next timer (Delay activity) for this workflow will expire
            TimerEventSubscriptionCollection timers = (TimerEventSubscriptionCollection)rootActivity.GetValue(TimerEventSubscriptionCollection.TimerCollectionProperty);
            TimerEventSubscription subscription = timers.Peek();
            if (subscription != null )
            {
                // Set a system timer to automatically reload this workflow when its next timer expires
                TimerCallback callback = new TimerCallback(ReloadWorkflow);
                TimeSpan timeDifference = subscription.ExpiresAt - DateTime.UtcNow;
                // check to make sure timeDifference is in legal range
                if (timeDifference > FilePersistenceService.MaxInterval)
                {
                    timeDifference = FilePersistenceService.MaxInterval;
                }
                else if (timeDifference < TimeSpan.Zero)
                {
                    timeDifference = TimeSpan.Zero;
                }
                this .instanceTimers.Add(contextGuid, new System.Threading.Timer(
                    callback,
                    subscription.WorkflowInstanceId,
                    timeDifference,
                    new TimeSpan(-1)));
            }
        }

        private void ReloadWorkflow( object id)
        {
            // Reload the workflow so that it will continue processing
            Timer toDispose;
            if ( this .instanceTimers.TryGetValue((Guid)id, out toDispose))
            {
                this .instanceTimers.Remove((Guid)id);
                toDispose.Dispose();
            }
            this .Runtime.GetWorkflow((Guid)id);
        }

        // Load workflow instance state.
        protected override Activity LoadWorkflowInstanceState(Guid instanceId)
        {
            Console.WriteLine(" Loading instance: {0}\n ", instanceId);
            byte [] workflowBytes = DeserializeFromFile(instanceId);
            return WorkflowPersistenceService.RestoreFromDefaultSerializedForm(workflowBytes, null );
        }

        // Unlock the workflow instance state.
        // Instance state locking is necessary when multiple runtimes share instance persistence store
        protected override void UnlockWorkflowInstanceState(Activity state)
        {
            //File locking is not supported in this sample
        }

        // Save the completed activity state.
        protected override void SaveCompletedContextActivity(Activity activity)
        {
            Guid contextGuid = (Guid)activity.GetValue(Activity.ActivityContextGuidProperty);
            Console.WriteLine(" Saving completed activity context: {0} ", contextGuid);
            SerializeToFile(
                WorkflowPersistenceService.GetDefaultSerializedForm(activity), contextGuid);
        }

        // Load the completed activity state.
        protected override Activity LoadCompletedContextActivity(Guid activityId, Activity outerActivity)
        {
            Console.WriteLine(" Loading completed activity context: {0} ", activityId);
            byte [] workflowBytes = DeserializeFromFile(activityId);
            Activity deserializedActivities = WorkflowPersistenceService.RestoreFromDefaultSerializedForm(workflowBytes, outerActivity);
            return deserializedActivities;

        }

        protected override bool UnloadOnIdle(Activity activity)
        {
            return unloadOnIdle;
        }
        // Serialize the activity instance state to file
        private void SerializeToFile( byte [] workflowBytes, Guid id)
        {
            String filename = id.ToString();
            FileStream fileStream = null ;
            try
            {
                if (File.Exists(filename))
                    File.Delete(filename);

                fileStream = new FileStream(filename, FileMode.CreateNew, FileAccess.Write, FileShare.None);

                // Get the serialized form
                fileStream.Write(workflowBytes, 0, workflowBytes.Length);
            }
            finally
            {
                if (fileStream != null )
                    fileStream.Close();
            }
        }
        // Deserialize the instance state from the file given the instance id
        private byte [] DeserializeFromFile(Guid id)
        {
            String filename = id.ToString();
            FileStream fileStream = null ;
            try
            {
                // File opened for shared reads but no writes by anyone
                fileStream = new FileStream(filename, FileMode.Open, FileAccess.Read, FileShare.Read);
                fileStream.Seek(0, SeekOrigin.Begin);
                byte [] workflowBytes = new byte [fileStream.Length];

                // Get the serialized form
                fileStream.Read(workflowBytes, 0, workflowBytes.Length);

                return workflowBytes;
            }
            finally
            {
                fileStream.Close();
            }
        }
    }

1.2 看看我们的工作流设计,只需要放入一个DelayActivity即可并设置他的Timeout时间,如下图:

1.3 宿主程序加载了我们自定义的持久化服务后,执行结果如下:

二:上面的例子其实很简单,我们只是做了一些基本的操作,还有很多工作没有做。我们就来说说如何自定义持久化服务。自定义持久化服务大概有以下几步:

1.定义自己的持久化类FilePersistenceService ,必须继承自WorkflowPersistenceService.
2.实现WorkflowPersistenceService中所有的抽象方法,下面会具体介绍。
3.把自定义的持久化服务装载到工作流引擎中(和装载WF提供的标准服务的方式一样)。

下面我们来介绍下WorkflowPersistenceService类中相关的抽象方法:

2.1 SaveWorkflowInstanceState: 将工作流实例状态保存到数据存储区。

    
      protected
    
    
      internal
    
    
      abstract
    
    
      void
    
     SaveWorkflowInstanceState(Activity rootActivity,
    
      bool
    
     unlock)
  
    rootActivity:工作流实例的根 Activity。
    
unlock:如果工作流实例不应锁定,则为 true;如果工作流实例应该锁定,则为 false。 关于锁方面的我们暂时不提。
     
  
    在这个方法中我们会把rootActivity 序列化到 Stream 中,可以看我们上面的例子中的代码实现的该方法中有这样一段
  
     
  
    SerializeToFile( WorkflowPersistenceService.GetDefaultSerializedForm(rootActivity), contextGuid);
    
private void SerializeToFile( byte [] workflowBytes, Guid id)
     
  
    SerializeToFile方法的第一个参数是需要一个byte[]的数组,我们是使用WorkflowPersistenceService.GetDefaultSerializedForm
    
(rootActivity)来实现的,那我们Reflector一下WorkflowPersistenceService这个类中的GetDefaultSerializedForm
方法,代码如下:
    
      protected
    
    
      static
    
    
      byte
    
    [] GetDefaultSerializedForm(Activity activity)
    {
        DateTime now = DateTime.Now;
        
    
      using
    
     (MemoryStream stream = 
    
      new
    
     MemoryStream(0x2800))
        {
            stream.Position = 0L;
            activity.Save(stream);
            
    
      using
    
     (MemoryStream stream2 = 
    
      new
    
     MemoryStream((
    
      int
    
    ) stream.Length))
            {
                
    
      using
    
     (GZipStream stream3 = 
    
      new
    
     GZipStream(stream2, CompressionMode.Compress,
    
true )) { stream3.Write(stream.GetBuffer(), 0, ( int ) stream.Length); } ActivityExecutionContextInfo info = (ActivityExecutionContextInfo)
activity.GetValue(Activity.ActivityExecutionContextInfoProperty); TimeSpan span = (TimeSpan) (DateTime.Now - now); WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, " Serialized a
{0} with id {1} to length {2}. Took {3}.
", new object [] { info, info.ContextGuid,
stream2.Length, span }); byte [] array = stream2.GetBuffer(); Array.Resize< byte >( ref array, Convert.ToInt32(stream2.Length)); return array; } } }

可以看出主要是调用了activity.Save(stream); ,将rootActivity 序列化到 Stream 中,如果我们自定义这个方法我们要调用 Activity的save方法将Activity序列化到stream中去,我们在实现LoadWorkflowInstanceState方法时会调用Activity的Load方法来读取,另外我们把工作流保存到持久化存储里我们一般都使用WorkflowInstanceId来做为唯一性标识

当工作流实例完成或终止时,工作流运行时引擎最后一次调用 SaveWorkflowInstanceState。因此,如果 GetWorkflowStatus等于 Completed或 Terminated,则可以从数据存储区中安全地删除工作流实例及其所有关联的已完成作用域。此外,可以订阅 WorkflowCompleted或 WorkflowTerminated事件,确定何时可以安全地删除与工作流实例关联的记录。是否确实从数据存储区中删除记录取决于您的实现。

如果无法将工作流实例状态保存到数据存储区,则应引发带有适当错误消息的 PersistenceException。

  2.2 LoadWorkflowInstanceState : SaveWorkflowInstanceState 中保存的工作流实例的指定状态加载回内存

    
      protected
    
    
      internal
    
    
      abstract
    
     Activity LoadWorkflowInstanceState(Guid instanceId)
  
instanceId:工作流实例的根活动的 Guid。

必须还原活动的相同副本。为此,必须从数据存储区中工作流实例的表示形式中还原有效的 Stream;然后,必须将此 Stream 传递到重载的 Load 方法之一,用于反序列化工作流实例。如果持久性服务无法从其数据存储区加载工作流实例状态,则它应引发带有适当消息的 PersistenceException。

在我们上面例子中实现的该方法中,

    
      protected
    
    
      override
    
     Activity LoadWorkflowInstanceState(Guid instanceId) 
{ 
    Console.WriteLine("
    
      Loading instance: {0}\n
    
    ", instanceId); 
    
    
      byte
    
    [] workflowBytes = DeserializeFromFile(instanceId); 
    
    
      return
    
     WorkflowPersistenceService.RestoreFromDefaultSerializedForm(workflowBytes, 
    
      null
    
    ); 
}
  

我们调用了WorkflowPersistenceService.RestoreFromDefaultSerializedForm(workflowBytes, null);方法
我们Reflector出WorkflowPersistenceService类的代码后可以看到,如下代码:

protected static Activity RestoreFromDefaultSerializedForm( byte [] activityBytes, Activity outerActivity)
    {
        Activity activity;
        DateTime now = DateTime.Now;
        MemoryStream stream = new MemoryStream(activityBytes);
        stream.Position = 0L;
        using (GZipStream stream2 = new GZipStream(stream, CompressionMode.Decompress, true ))
        {
            activity = Activity.Load(stream2, outerActivity);
        }
        TimeSpan span = (TimeSpan) (DateTime.Now - now);
        WorkflowTrace.Host.TraceEvent(TraceEventType.Information, 0, " Deserialized a {0}
                                                    to length {1}. Took {2}.
", new object [] { activity, stream.Length, span });
        return activity;
    }

    这段代码核心的调用了Activity.Load方法将从 Stream 加载 Activity的实例。
  
    
      2.3 SaveCompletedContextActivity
    
  
    
      protected
    
    
      internal
    
    
      abstract
    
    
      void
    
     SaveCompletedContextActivity(Activity activity)
  
    activity:表示已完成范围的 Activity。
  
     
  

将指定的已完成作用域保存到数据存储区。保存完成活动的AEC环境以便实现补偿,比如WhileActivity他每次循环的
都会创建新的AEC环境,这个时候完成的活动的AEC就会被保存,但是前提是这个活动要支持补偿才可以,所有如果你的WhileActivity里包含SequenceActivity这样该方法是不会被调用的,如果你换成CompensatableSequenceActivity就可以了

工作流运行时引擎保存已完成作用域活动的状态,以便实现补偿。必须调用重载的 Save 方法之一,将 activity 序列化到 Stream 中;然后可以选择在将 Stream 写入到数据存储区之前,对其执行其他处理。但是,在工作流运行时引擎调用 LoadCompletedContextActivity时,必须还原活动的相同副本。

本例子的程序中不会涉及到这部分

也同样使用了WorkflowPersistenceService的GetDefaultSerializedForm方法

2.4 LoadCompletedContextActivity 

和SaveCompletedContextActivity是对应的,加载SaveCompletedContextActivity中保存的已完成活动的AEC,就不多说了

2.5 UnlockWorkflowInstanceState: 解除对工作流实例状态的锁定。

此方法是抽象的,因此它不包含对锁定和解锁的默认实现。

实现自定义持久性服务时,如果要实现锁定方案,则需要重写此方法,并在 SaveWorkflowInstanceState方法中提供根据解锁参数的值进行锁定-解锁的机制。

比如我们的工作流在取消的时候,这个方法被调用来解除工作流实例状态的锁定。

2.6 UnloadOnIdle

返回一个布尔值,确定在工作流空闲时是否将其卸载。

这些都只是自定义持久化中最基本的,就先说这些吧。

坚持学习WF(14):自定义持久化服务


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论