Pārlūkot izejas kodu

添加一个用于在节点间同步文件的 c/s demo

reghao 3 gadi atpakaļ
vecāks
revīzija
aabf6dec0f

+ 16 - 1
README.md

@@ -1,5 +1,20 @@
 # dfs-store
-分布式存储系统
+一个分布式存储系统, 提供文件的存储, 访问和删除等服务, 不支持文件修改.
+
+文件存储:
+- 大文件(大于等于 8MiB)单独存储到文件系统中
+- 小文件(小于 8MiB)合并存储到一个文件块中
+
+文件删除:
+- 大文件标记为删除, 过一段时间后, 由定时任务自动删除
+- 小文件标记为删除并设置一个过期时间, 等到小文件所在文件块中的其他小文件都被标记为删除且最后一个小文件的过期时间超时后, 由定时任务自动删除整个文件块
+
+文件访问:
+- 可为单个文件设置访问权限
+
+## 多节点
+### 同步
+新文件存储到集群中的某个节点后, 还需要同步到集群内的其他节点, 保证整个集群的数据一致, 这里只能是最终一致性, 暂时无法做到强一致性
 
 ## 依赖的第三方服务
 - mysql

+ 83 - 0
src/main/java/cn/reghao/dfs/store/sync/FileClient.java

@@ -0,0 +1,83 @@
+package cn.reghao.dfs.store.sync;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+
+/**
+ * @author reghao
+ * @date 2022-08-26 15:07:46
+ */
+public class FileClient {
+    private Selector selector;
+
+    public FileClient init(String host, int port) throws IOException, InterruptedException {
+        SocketChannel socketChannel = SocketChannel.open();
+        socketChannel.configureBlocking(false);
+        selector = Selector.open();
+        socketChannel.connect(new InetSocketAddress(host, port));
+        socketChannel.register(selector, SelectionKey.OP_CONNECT);
+        return this;
+    }
+
+    public void start() throws IOException {
+        while (true) {
+            selector.select();
+            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
+            while (it.hasNext()) {
+                SelectionKey key = it.next();
+                it.remove();
+                if (key.isConnectable()) {
+                    SocketChannel client = (SocketChannel) key.channel();
+                    if (client.isConnectionPending()) {
+                        client.finishConnect();
+                    }
+                    client.configureBlocking(false);
+                    client.write(ByteBuffer.wrap(new String("halo server").getBytes(StandardCharsets.UTF_8)));
+
+                    client.register(selector, SelectionKey.OP_READ);
+                    System.out.println("客户端连接到服务端");
+                } else if (key.isReadable()) {
+                    System.out.println("客户端从服务端读取数据");
+
+                    String filePath = "/home/reghao/Downloads/haha.dat";
+                    File file = new File(filePath);
+                    if (!file.exists()) {
+                        file.createNewFile();
+                    }
+
+                    FileOutputStream fos = new FileOutputStream(file);
+                    FileChannel fileChannel = fos.getChannel();
+
+                    SocketChannel channel = (SocketChannel) key.channel();
+                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024*1024*10);
+                    int readBytes = 0;
+                    while (readBytes != -1) {
+                        readBytes = channel.read(byteBuffer);
+                        byteBuffer.flip();
+
+                        fileChannel.write(byteBuffer);
+                        fos.flush();
+                        byteBuffer.clear();
+                    }
+
+                    fileChannel.close();
+                    fos.close();
+                    channel.close();
+                    System.out.println("客户端从服务端读取数据完成");
+                } else if (key.isWritable()) {
+                    System.out.println("客户端写数据到服务端");
+                }
+            }
+        }
+    }
+
+    public static void main(String[] args) throws IOException, InterruptedException {
+        new FileClient().init("localhost", 8020).start();
+    }
+}

+ 74 - 0
src/main/java/cn/reghao/dfs/store/sync/FileServer.java

@@ -0,0 +1,74 @@
+package cn.reghao.dfs.store.sync;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.util.Iterator;
+
+/**
+ * @author reghao
+ * @date 2022-08-26 15:07:39
+ */
+public class FileServer {
+    private Selector selector;
+
+    public FileServer init(int port) throws IOException {
+        ServerSocketChannel serverChannel = ServerSocketChannel.open();
+        serverChannel.configureBlocking(false);
+        serverChannel.socket().bind(new InetSocketAddress(port));
+        selector = Selector.open();
+        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
+        return this;
+    }
+
+    public void listen() throws IOException {
+        boolean running = true;
+        while (running) {
+            selector.select();
+            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
+            while (it.hasNext()) {
+                SelectionKey key = it.next();
+                it.remove();
+                if (key.isAcceptable()) {
+                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
+                    SocketChannel socketChannel = server.accept();
+                    socketChannel.configureBlocking(false);
+                    socketChannel.register(selector, SelectionKey.OP_WRITE);
+                    System.out.println("客户端请求连接事件...");
+                } else if (key.isReadable()) {
+                    System.out.println("客户端写数据到服务端...");
+                    SocketChannel socketChannel = (SocketChannel) key.channel();
+
+
+                } else if (key.isWritable()) {
+                    System.out.println("客户端从服务端获取数据...");
+                    SocketChannel socketChannel = (SocketChannel) key.channel();
+
+                    String filePath = "/home/reghao/Downloads/mp4/BV1TU4y1W7Wn.mp4";
+                    File file = new File(filePath);
+                    FileInputStream fis = new FileInputStream(file);
+                    FileChannel fileChannel = fis.getChannel();
+                    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024*1024*500);
+                    while (fileChannel.position() < fileChannel.size()) {
+                        int readBytes = fileChannel.read(byteBuffer);
+                        byteBuffer.flip();
+                        while (byteBuffer.hasRemaining()) {
+                            socketChannel.write(byteBuffer);
+                        }
+                        byteBuffer.clear();
+                    }
+
+                    socketChannel.close();
+                    System.out.println("结束写事件...");
+                }
+            }
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+        new FileServer().init(8020).listen();
+    }
+}