简易的分布式文件系统

系统 1576 0

简易的分布式文件系统

本来初期打算用Hadoop 2,可是后来有限的服务器部署了Solr Cloud,各种站点,发现资源不够了,近10T的文件,已经几乎把服务器的磁盘全部用光。想来想去,由于目前架构基于Scala的,所以还是用Scala Akka实现了一个简单版本的分布式文件系统。

 

Scala版本是2.10.3:http://www.scala-lang.org,Akka版本是2.2.3:http://akka.io。

 

所有文件随机放在不同的服务器上,在数据库中记录了文件存放的服务器IP地址、文件路径。在服务端部署基于Akka的简单文件服务,接收文件路径,读取并返回文件内容。调用者根据文件地址,去数据库中查找文件的服务IP地址和文件路径,根据得到的服务器IP地址,传入文件路径,调用该服务器的文件服务。

 

以下是部分实现代码。

 

1.文件服务参数

 

      
        1
      
      
        case
      
      
        class
      
      
         PatentFulltextArgs(


      
      
        2
      
      
          val url: String,


      
      
        3
      
      
          val start: Int,


      
      
        4
      
      
          val size: Int) {


      
      
        5
      
      
        6
      
       }
    

 

2.文件服务Trait(有点像WCF中的服务契约)

 

      
        1
      
      
        trait PatentFulltextService {


      
      
        2
      
      
          def find(args: PatentFulltextArgs): Array[Byte]


      
      
        3
      
       }
    

 

3.文件服务实现

 

      
         1
      
      
        class
      
       PatentFulltextServiceImpl 
      
        extends
      
      
         PatentFulltextService with Disposable {


      
      
         2
      
         def find(args: PatentFulltextArgs): Array[Byte] =
      
         {


      
      
         3
      
           val list =
      
         ListBuffer[Byte]()


      
      
         4
      
           val file =
      
         FileSystems.getDefault().getPath(args.url)


      
      
         5
      
      
         6
      
           using(Files.newInputStream(file)) { in =>


      
         7
      
      
              {


      
      
         8
      
               val bytes = 
      
        new
      
       Array[Byte](args.size + 1
      
        )


      
      
         9
      
      
                in.skip(args.start)


      
      
        10
      
               in.read(bytes, 0
      
        , bytes.length)


      
      
        11
      
      
        12
      
               list ++=
      
         bytes


      
      
        13
      
      
              }


      
      
        14
      
      
            }


      
      
        15
      
      
        16
      
      
            list.toArray


      
      
        17
      
      
          }


      
      
        18
      
       }
    

 

4.用户Akka Deploy发布的类

 

      
        class
      
       ServiceApplication 
      
        extends
      
      
         Bootable {

  val system 
      
      = ActorSystem("serivce", ConfigFactory.load.getConfig("service"
      
        ))

  def startup() {

    TypedActor(system).typedActorOf(TypedProps[PatentFulltextServiceImpl], 
      
      "patentfulltext"
      
        )

  }



  def shutdown() {

    system.shutdown

  }

}
      
    

 

在这里,我使用的Akka的TypeActor,请参考:http://doc.akka.io/docs/akka/2.2.3/scala/typed-actors.html。

 

以下是部署过程。

 

把生成的jar包,发布在Akka的deploy目录下,根据需要修改Akka的配置文件目录config下的application.conf。以下是我配置的内容,仅供参考:

 

actor {

 

provider = "akka.remote.RemoteActorRefProvider"

 

 

 

typed {

 

# Default timeout for typed actor methods with non-void return type

 

timeout = 6000s

 

}

 

}

 

remote {

 

transport = "akka.remote.netty.NettyRemoteTransport"

 

netty.tcp {

 

   hostname  = "服务端IP"

 

  port = 2552

 

}

 

客户端使用时只需要服务契约Trait和相关实体类,以下是我写的一个客户端调用的类,仅供参考:

 

      
         1
      
      
        object RemoteService {


      
      
         2
      
         val logger = LoggerFactory.getLogger(
      
        this
      
      
        .getClass())


      
      
         3
      
      
        private
      
       var system: ActorSystem = 
      
        null
      
      
         4
      
      
         5
      
         def apply(configFile: String) =
      
         {


      
      
         6
      
           system = ActorSystem("RemoteService", ConfigFactory.parseFile(
      
        new
      
      
         File(configFile)))


      
      
         7
      
      
          }


      
      
         8
      
      
         9
      
         def findPatentFulltext(serverIp: String, patentFulltextArgs: PatentFulltextArgs) =
      
         {


      
      
        10
      
           TypedActor(system).typedActorOf(TypedProps[com.cloud.akka.service.model.PatentFulltextService], system.actorFor("akka.tcp://serivce@" + serverIp + ":2552/user/patentfulltext"
      
        )).find(patentFulltextArgs)


      
      
        11
      
      
        12
      
      
          }


      
      
        13
      
      
        14
      
         def shutdown =
      
         {


      
      
        15
      
      
        if
      
       (
      
        null
      
       !=
      
         system) system.shutdown()


      
      
        16
      
      
          }


      
      
        17
      
       }}
    

 

以下问题是我还没找到合适的解决办法:

 

1.Akka无法传输大文件,即使修改配置,服务器可以返回,但是接收的客户端还会报错。我的解决方案是在客户端分块读取,然后合并。

 

2.在客户端使用时,TypedActor没有找到使用ActorSelection构建,因为ActorFor是标记为 Deprecated。

 

简易的分布式文件系统


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论