Docker搭建MinIo分布式系统

1.什么是分布式文件系统

文件系统是负责管理和存储文件的系统软件,操作系统通过文件系统提供的接口去存取文件,用户通过操作系统访问磁盘上的文件。

下图指示了文件系统所处的位置:

通过概念可以简单理解为:一个计算机无法存储海量的文件,通过网络将若干计算机组织起来共同去存储海量的文件,去接收海量用户的请求,这些组织起来的计算机通过网络进行通信,如下图:

2.MinIo系统

MinIO 是一个非常轻量的服务,可以很简单的和其他应用的结合使用,它兼容亚马逊 S3 云存储服务接口,非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等。

它一大特点就是轻量,使用简单,功能强大,支持各种平台,单个文件最大5TB,兼容 Amazon S3接口,提供了 JavaPythonGO等多版本SDK支持。

官网:https://min.io

中文:https://www.minio.org.cn/,http://docs.minio.org.cn/docs/

MinIO集群采用去中心化共享架构,每个结点是对等关系,通过Nginx可对MinIO进行负载均衡访问。

它将分布在不同服务器上的多块硬盘组成一个对象存储服务。由于硬盘分布在不同的节点上,分布式Minio避免了单点故障。如下图:

Minio使用纠删码技术来保护数据,它是一种恢复丢失和损坏数据的数学算法,它将数据分块冗余的分散存储在各各节点的磁盘上,所有的可用磁盘组成一个集合,上图由8块硬盘组成一个集合,当上传一个文件时会通过纠删码算法计算对文件进行分块存储,除了将文件本身分成4个数据块,还会生成4个校验块,数据块和校验块会分散的存储在这8块硬盘上。

使用纠删码的好处是即便丢失一半数量(N/2)的硬盘,仍然可以恢复数据。 比如上边集合中有4个以内的硬盘损害仍可保证数据恢复,不影响上传和下载,如果多于一半的硬盘坏了则无法恢复。

说白了只要还存在一半及其以上的结点,就会自动恢复硬盘🍹

3.docker 部署

3.1.下载 Minio 镜像

命令 描述
docker pull minio/minio 下载最新版 Minio 镜像 (其实此命令就等同于 : docker pull minio/minio:latest )
docker pull minio/minio:RELEASE.2022-06-20T23-13-45Z.fips 下载指定版本的 Minio 镜像 (xxx 指具体版本号)

检查当前所有Docker下载的镜像

docker images

3.2.创建目录

一个用来存放配置,一个用来存储上传文件的目录

启动前需要先创建 Minio 外部挂载的配置文件(/mydata/minio/config),和存储上传文件的目录(/mydata/minio/data)

mkdir -p /mydata/minio/config
mkdir -p /mydata/minio/data

3.3.创建 Minio 容器并运行

挂载的多行模式

docker run -p 9000:9000 -p 9090:9090 \
     --net=host \
     --name minio \
     -d --restart=always \
     -e "MINIO_ROOT_USER=minioadmin" \
     -e "MINIO_ROOT_PASSWORD=minioadmin" \
     -v /mydata/minio/data:/data \
     -v /mydata/minio/config:/root/.minio \
     minio/minio server \
     /data --console-address ":9090" -address ":9000"

3.4.访问操作

访问:http://162.14.107.240:9090/login 用户名:密码 minioadmin:minioadmin

3.5.创建 Bucket

并且需要将权限修改了public

3.6.上传文件

4.SDK 操作

官方文档:https://docs.min.io/docs/

maven依赖如下:

<dependency>
    <groupId>io.minio</groupId>
    <artifactId>minio</artifactId>
    <version>8.4.3</version>
</dependency>
<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>okhttp</artifactId>
    <version>4.8.1</version>
</dependency>

4.1.上传文件

参数说明:

需要三个参数才能连接到minio服务。

参数 说明
Endpoint 对象存储服务的URL
Access Key Access key就像用户ID,可以唯一标识你的账户。
Secret Key Secret key是你账户的密码。

测试代码如下:

package com.xuecheng.media;

/**
 * @description 测试MinIO
 * @author lemon
 * @date 2022/9/11 21:24
 * @version 1.0
 */
public class MinIOTest {

 static MinioClient minioClient =
         MinioClient.builder()
                 .endpoint("http://162.14.107.240:9000")
                 .credentials("minioadmin", "minioadmin")
                 .build();


 //上传文件
public static void upload()throws IOException, NoSuchAlgorithmException, InvalidKeyException {
 try {
  boolean found =
          minioClient.bucketExists(BucketExistsArgs.builder().bucket("testbucket").build());
  //检查testbucket桶是否创建,没有创建自动创建
  if (!found) {
   minioClient.makeBucket(MakeBucketArgs.builder().bucket("testbucket").build());
  } else {
   System.out.println("Bucket 'testbucket' already exists.");
  }
  //上传1.mp4
  minioClient.uploadObject(
          UploadObjectArgs.builder()
                  .bucket("testbucket")
                  .object("1.mp4")
                  .filename("D:\\develop\\upload\\1.mp4")
                  .build());
  //上传1.avi,上传到avi子目录
  minioClient.uploadObject(
          UploadObjectArgs.builder()
                  .bucket("testbucket")
                  .object("avi/1.avi")
                  .filename("D:\\develop\\upload\\1.avi")
                  .build());
  System.out.println("上传成功");
 } catch (MinioException e) {
  System.out.println("Error occurred: " + e);
  System.out.println("HTTP trace: " + e.httpTrace());
 }

}
public static void main(String[] args)throws IOException, NoSuchAlgorithmException, InvalidKeyException {
 upload();
}


}

4.2.删除文件

下边测试删除文件

参考:https://docs.min.io/docs/java-client-api-reference#removeObject

//删除文件
public static void delete(String bucket,String filepath)throws IOException, NoSuchAlgorithmException, InvalidKeyException {
 try {

  minioClient.removeObject(
          RemoveObjectArgs.builder().bucket(bucket).object(filepath).build());
  System.out.println("删除成功");
 } catch (MinioException e) {
  System.out.println("Error occurred: " + e);
  System.out.println("HTTP trace: " + e.httpTrace());
 }

}

 public static void main(String[] args)throws IOException, NoSuchAlgorithmException, InvalidKeyException {
//  upload();
  delete("testbucket","1.mp4");
  delete("testbucket","avi/1.avi");

 }

4.3.查询文件

通过查询文件查看文件是否存在minio中。

参考:https://docs.min.io/docs/java-client-api-reference#getObject

//下载文件outFile就是下载到本地的路径
 public static void getFile(String bucket,String filepath,String outFile)throws IOException, NoSuchAlgorithmException, InvalidKeyException {
  try {


   try (InputStream stream = minioClient.getObject(
           GetObjectArgs.builder()
                   .bucket(bucket)
                   .object(filepath)
                   .build());
        FileOutputStream fileOutputStream = new FileOutputStream(new File(outFile));
   ) {

    // Read data from stream
    IOUtils.copy(stream,fileOutputStream);
    System.out.println("下载成功");
   }

  } catch (MinioException e) {
   System.out.println("Error occurred: " + e);
   System.out.println("HTTP trace: " + e.httpTrace());
  }

 }


 public static void main(String[] args)throws IOException, NoSuchAlgorithmException, InvalidKeyException {
  upload();
//  delete("testbucket","1.mp4");
//  delete("testbucket","avi/1.avi");
  getFile("testbucket","avi/1.avi","D:\\develop\\minio_data\\1.avi");
 }

4.4.分块上传视频(点断续传)

流程

分块先上传到minio里面,然后从里面下载到本地,在本地进行合成合成文件,再次上传到minio就是这个过程

最终合成完毕后删除分块文件

4.4.1.分块上传
package com.xuecheng.media.api;

/**
 * @author lemon
 * @version 1.0
 * @description 大文件上传接口
 * @date 2022/9/6 11:29
 */

@Api(value = "大文件上传接口", tags = "大文件上传接口")
@RestController
public class BigFilesController {

    @Autowired
    MediaFileService mediaFileService;


    @ApiOperation(value = "文件上传前检查文件")
    @PostMapping("/upload/checkfile")
    public RestResponse<Boolean> checkfile(
            @RequestParam("fileMd5") String fileMd5
    ) throws Exception {
        
    }


    @ApiOperation(value = "分块文件上传前的检测")
    @PostMapping("/upload/checkchunk")
    public RestResponse<Boolean> checkchunk(@RequestParam("fileMd5") String fileMd5,
                                            @RequestParam("chunk") int chunk) throws Exception {
       
    }

    @ApiOperation(value = "上传分块文件")
    @PostMapping("/upload/uploadchunk")
    public RestResponse uploadchunk(@RequestParam("file") MultipartFile file,
                                    @RequestParam("fileMd5") String fileMd5,
                                    @RequestParam("chunk") int chunk) throws Exception {

        
    }

    @ApiOperation(value = "合并文件")
    @PostMapping("/upload/mergechunks")
    public RestResponse mergechunks(@RequestParam("fileMd5") String fileMd5,
                                    @RequestParam("fileName") String fileName,
                                    @RequestParam("chunkTotal") int chunkTotal) throws Exception {
        

    }


}

进行实现

  • fileMd5没有写文件名字的话就根据这个原本的文件名来转为md5的形式,然后目录结构就是
4.4.2.检查文件是否已经上传了
@Override
public RestResponse<Boolean> checkFile(String fileMd5) {
    
    //查询文件信息
    MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);
    if (mediaFiles != null) {
        //桶
        String bucket = mediaFiles.getBucket();
        //存储目录
        String filePath = mediaFiles.getFilePath();
        //文件流
        InputStream stream = null;
        try {
            stream = minioClient.getObject(
                    GetObjectArgs.builder()
                            .bucket(bucket)
                            .object(filePath)
                            .build());

            if (stream != null) {
                //文件已存在
                return RestResponse.success(true);
            }
        } catch (Exception e) {
           
        }
    }
    //文件不存在
    return RestResponse.success(false);
}


// 查询分块是否存在
@Override
public RestResponse<Boolean> checkChunk(String fileMd5, int chunkIndex) {

    //得到分块文件目录
    String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
    //得到分块文件的路径
    String chunkFilePath = chunkFileFolderPath + chunkIndex;

    //文件流
    InputStream fileInputStream = null;
    try {
        fileInputStream = minioClient.getObject(
                GetObjectArgs.builder()
                        .bucket(bucket_videoFiles)
                        .object(chunkFilePath)
                        .build());

        if (fileInputStream != null) {
            //分块已存在
            return RestResponse.success(true);
        }
    } catch (Exception e) {
        
    }
    //分块未存在
    return RestResponse.success(false);
}

//得到分块文件的目录
private String getChunkFileFolderPath(String fileMd5) {
    return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";
}
4.4.3.上传分块文件到minio
// 上传分块文件
@Override
public RestResponse uploadChunk(String fileMd5, int chunk, byte[] bytes) {


    //得到分块文件的目录路径
    String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
    //得到分块文件的路径
    String chunkFilePath = chunkFileFolderPath + chunk;

    try {
    //将文件存储至minIO
    addMediaFilesToMinIO(bytes, bucket_videoFiles,chunkFilePath);
     return RestResponse.success(true);
    } catch (Exception ex) {
        ex.printStackTrace();
        log.debug("上传分块文件:{},失败:{}",chunkFilePath,e.getMessage());
    }
    return RestResponse.validfail(false,"上传分块失败");
}
4.4.4.下载所有分块文件

下边先实现检查及下载所有分块的方法。

  • chunkTotal表示分块的个数从0开始的
//检查所有分块是否上传完毕
private File[] checkChunkStatus(String fileMd5, int chunkTotal) {
    //得到分块文件的目录路径
    String chunkFileFolderPath = getChunkFileFolderPath(fileMd5);
    File[] files = new File[chunkTotal];
    //检查分块文件是否上传完毕
    for (int i = 0; i < chunkTotal; i++) {
        String chunkFilePath = chunkFileFolderPath + i;
        //下载文件
        File chunkFile =null;
        try {
            chunkFile = File.createTempFile("chunk" + i, null);
        } catch (IOException e) {
            e.printStackTrace();
            XueChengPlusException.cast("下载分块时创建临时文件出错");
        }
        downloadFileFromMinIO(chunkFile,bucket_videoFiles,chunkFilePath);
        files[i]=chunkFile;
    }
    return files;
}
//得到分块文件的目录
private String getChunkFileFolderPath(String fileMd5) {
    return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + "chunk" + "/";
}
//根据桶和文件路径从minio下载文件
public File downloadFileFromMinIO(File file,String bucket,String objectName){
    InputStream fileInputStream = null;
    OutputStream fileOutputStream = null;
    try {
        fileInputStream = minioClient.getObject(
                GetObjectArgs.builder()
                        .bucket(bucket)
                        .object(objectName)
                        .build());
        try {
            fileOutputStream = new FileOutputStream(file);
            IOUtils.copy(fileInputStream, fileOutputStream);

        } catch (IOException e) {
            XueChengPlusException.cast("下载文件"+objectName+"出错");
        }
    } catch (Exception e) {
        e.printStackTrace();
        XueChengPlusException.cast("文件不存在"+objectName);
    } finally {
        if (fileInputStream != null) {
            try {
                fileInputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (fileOutputStream != null) {
            try {
                fileOutputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    return file;
}
4.4.5.合并分块接口实现如下:
@Override
public RestResponse mergechunks(Long companyId, String fileMd5, int chunkTotal, UploadFileParamsDto uploadFileParamsDto) {

    String fileName = uploadFileParamsDto.getFilename();
    //下载所有分块文件
    File[] chunkFiles = checkChunkStatus(fileMd5, chunkTotal);
    //扩展名
    String extName = fileName.substring(fileName.lastIndexOf("."));
    //创建临时文件作为合并文件
    File mergeFile = null;
    try {
        mergeFile = File.createTempFile(fileMd5, extName);
    } catch (IOException e) {
        XueChengPlusException.cast("合并文件过程中创建临时文件出错");
    }

    try {
        //开始合并
        byte[] b = new byte[1024];
        try(RandomAccessFile raf_write = new RandomAccessFile(mergeFile, "rw");) {
            for (File chunkFile : chunkFiles) {
                try (FileInputStream chunkFileStream = new FileInputStream(chunkFile);) {
                    int len = -1;
                    while ((len = chunkFileStream.read(b)) != -1) {
                        //向合并后的文件写
                        raf_write.write(b, 0, len);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
            XueChengPlusException.cast("合并文件过程中出错");
        }
        log.debug("合并文件完成{}",mergeFile.getAbsolutePath());
        uploadFileParamsDto.setFileSize(mergeFile.length());

        try (InputStream mergeFileInputStream = new FileInputStream(mergeFile);) {
            // 就是看文件名字
            // 对文件进行校验,通过比较md5值
            String newFileMd5 = DigestUtils.md5Hex(mergeFileInputStream);
            if (!fileMd5.equalsIgnoreCase(newFileMd5)) {
                //校验失败
                XueChengPlusException.cast("合并文件校验失败");
            }
            log.debug("合并文件校验通过{}",mergeFile.getAbsolutePath());
        } catch (Exception e) {
            e.printStackTrace();
            //校验失败
            XueChengPlusException.cast("合并文件校验异常");
        }

        //将临时文件上传至minio
        String mergeFilePath = getFilePathByMd5(fileMd5, extName);
        try {

            //上传文件到minIO
            addMediaFilesToMinIO(mergeFile.getAbsolutePath(), bucket_videoFiles, mergeFilePath);
            log.debug("合并文件上传MinIO完成{}",mergeFile.getAbsolutePath());
        } catch (Exception e) {
            e.printStackTrace();
            XueChengPlusException.cast("合并文件时上传文件出错");
        }

        //入数据库代理的形式
        MediaFiles mediaFiles = currentProxy.addMediaFilesToDb(companyId, fileMd5, uploadFileParamsDto, bucket_videoFiles, mergeFilePath);
        if (mediaFiles == null) {
            XueChengPlusException.cast("媒资文件入库出错");
        }

        return RestResponse.success();
    } finally {
        //删除临时文件
        for (File file : chunkFiles) {
            try {
                file.delete();
            } catch (Exception e) {

            }
        }
        try {
            mergeFile.delete();
        } catch (Exception e) {

        }
    }
}


 private String getFilePathByMd5(String fileMd5,String fileExt){
    return   fileMd5.substring(0,1) + "/" + fileMd5.substring(1,2) + "/" + fileMd5 + "/" +fileMd5 +fileExt;
}


//将文件上传到minIO,传入文件绝对路径
public void addMediaFilesToMinIO(String filePath, String bucket, String objectName) {
    //扩展名
    String extension = null;
    if(objectName.indexOf(".")>=0){
        extension = objectName.substring(objectName.lastIndexOf("."));
    }
    //获取扩展名对应的媒体类型
    String contentType = getMimeTypeByExtension(extension);
    try {
        minioClient.uploadObject(
                UploadObjectArgs.builder()
                        .bucket(bucket)
                        .object(objectName)
                        .filename(filePath)
                        .contentType(contentType)
                        .build());
    } catch (Exception e) {
        e.printStackTrace();
        XueChengPlusException.cast("上传文件到文件系统出错");
    }
}

private String getMimeTypeByExtension(String extension){
    String contentType = MediaType.APPLICATION_OCTET_STREAM_VALUE;
    if(StringUtils.isNotEmpty(extension)){
        ContentInfo extensionMatch = ContentInfoUtil.findExtensionMatch(extension);
        if(extensionMatch!=null){
            contentType = extensionMatch.getMimeType();
        }
    }
    return contentType;

}

最终效果图

Logo

更多推荐