Prechádzať zdrojové kódy

优化 docker 客户端

reghao 5 rokov pred
rodič
commit
5d79933cac

+ 128 - 77
common/src/main/java/cn/reghao/autodop/common/dockerc/Docker.java

@@ -10,6 +10,7 @@ import cn.reghao.autodop.common.dockerc.unixdomain.DockerAuth;
 import cn.reghao.autodop.common.dockerc.unixdomain.DockerHeader;
 import cn.reghao.autodop.common.dockerc.unixdomain.UnixSocketClient;
 import cn.reghao.autodop.common.dockerc.unixdomain.HttpClient;
+import cn.reghao.autodop.common.utils.ExceptionUtil;
 import cn.reghao.autodop.common.utils.serializer.JsonArrayDeserializer;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import cn.reghao.autodop.common.utils.security.Base64Util;
@@ -68,7 +69,7 @@ public class Docker implements ImageOps, ContainerOps, AutoCloseable {
             }
             return result;
         } else {
-            throw new NullPointerException("Docker Response is NULL");
+            throw new DockerException("Docker 响应是 NULL...");
         }
     }
 
@@ -128,7 +129,7 @@ public class Docker implements ImageOps, ContainerOps, AutoCloseable {
     }
 
     @Override
-    public void push(String image) throws DockerException, InterruptedException {
+    public void push(String image) throws DockerException {
         String uri = DockerApi.pushPost.replace("{}", image);
 
         DockerAuth auth = new DockerAuth();
@@ -140,8 +141,12 @@ public class Docker implements ImageOps, ContainerOps, AutoCloseable {
         List<DockerHeader> headers = new ArrayList<>();
         headers.add(new DockerHeader("X-Registry-Auth", encodeAuth));
 
-        FullHttpResponse response = client.post(uri, headers);
-        checkResponse(response);
+        try {
+            FullHttpResponse response = client.post(uri, headers);
+            checkResponse(response);
+        } catch (IOException | InterruptedException e) {
+            throw new DockerException(ExceptionUtil.errorMsg(e));
+        }
     }
 
     @Override
@@ -149,25 +154,37 @@ public class Docker implements ImageOps, ContainerOps, AutoCloseable {
         String params = "?fromImage=" + repoTag;
         String uri = DockerApi.pullPost + params;
 
-        FullHttpResponse response = client.post(uri, null);
-        checkResponse(response);
+        try {
+            FullHttpResponse response = client.post(uri, null);
+            checkResponse(response);
+        } catch (IOException | InterruptedException e) {
+            throw new DockerException(ExceptionUtil.errorMsg(e));
+        }
     }
 
     @Override
     public void rmImage(String repoTag) throws DockerException {
         String uri = DockerApi.rmImageDelete.replace("{}", repoTag);
-        FullHttpResponse response = client.post(uri, null);
-        checkResponse(response);
+        try {
+            FullHttpResponse response = client.post(uri, null);
+            checkResponse(response);
+        } catch (IOException | InterruptedException e) {
+            throw new DockerException(ExceptionUtil.errorMsg(e));
+        }
     }
 
     @Override
     public String inspectImage(String image) throws DockerException {
         String uri = DockerApi.inspectImageGet.replace("{}", image);
-        FullHttpResponse response = client.get(uri, null);
-        if (response != null) {
-            return response.content().toString(StandardCharsets.UTF_8);
-        } else {
-            throw new DockerException("Docker Response is NULL");
+        try {
+            FullHttpResponse response = client.get(uri, null);
+            if (response != null) {
+                return response.content().toString(StandardCharsets.UTF_8);
+            } else {
+                throw new DockerException("请求 " + uri + " 的响应是 NULL...");
+            }
+        } catch (IOException | InterruptedException e) {
+            throw new DockerException(ExceptionUtil.errorMsg(e));
         }
     }
 
@@ -185,19 +202,22 @@ public class Docker implements ImageOps, ContainerOps, AutoCloseable {
                 .replace("networkMode", "NetworkMode");
                 //.replace("volumes", "Volumes");
 
-        FullHttpResponse response = client.postJson(uri, null, json);
-        if (response != null) {
-            int statusCode = response.status().code();
-            String result = response.content().toString(StandardCharsets.UTF_8);
-            if (statusCode == 201) {
-                JsonObject jsonObject = new JsonParser().parse(result).getAsJsonObject();
-                return jsonObject.get("Id").getAsString();
-            } else {
-                throw new DockerException(result);
+        try {
+            FullHttpResponse response = client.postJson(uri, null, json);
+            if (response != null) {
+                int statusCode = response.status().code();
+                String result = response.content().toString(StandardCharsets.UTF_8);
+                if (statusCode == 201) {
+                    JsonObject jsonObject = new JsonParser().parse(result).getAsJsonObject();
+                    return jsonObject.get("Id").getAsString();
+                } else {
+                    throw new DockerException(result);
+                }
             }
+            return null;
+        } catch (IOException | InterruptedException e) {
+            throw new DockerException(ExceptionUtil.errorMsg(e));
         }
-
-        return null;
     }
 
     private String create(String containerName, String repoTag, Config config) throws DockerException {
@@ -210,19 +230,22 @@ public class Docker implements ImageOps, ContainerOps, AutoCloseable {
                 .replace("networkMode", "NetworkMode");
         //.replace("volumes", "Volumes");
 
-        FullHttpResponse response = client.postJson(uri, null, json);
-        if (response != null) {
-            int statusCode = response.status().code();
-            String result = response.content().toString(StandardCharsets.UTF_8);
-            if (statusCode == 201) {
-                JsonObject jsonObject = new JsonParser().parse(result).getAsJsonObject();
-                return jsonObject.get("Id").getAsString();
-            } else {
-                throw new DockerException(result);
+        try {
+            FullHttpResponse response = client.postJson(uri, null, json);
+            if (response != null) {
+                int statusCode = response.status().code();
+                String result = response.content().toString(StandardCharsets.UTF_8);
+                if (statusCode == 201) {
+                    JsonObject jsonObject = new JsonParser().parse(result).getAsJsonObject();
+                    return jsonObject.get("Id").getAsString();
+                } else {
+                    throw new DockerException(result);
+                }
             }
+            return null;
+        } catch (IOException | InterruptedException e) {
+            throw new DockerException(ExceptionUtil.errorMsg(e));
         }
-
-        return null;
     }
 
     @Override
@@ -247,39 +270,58 @@ public class Docker implements ImageOps, ContainerOps, AutoCloseable {
     @Override
     public void start(String containerId) throws DockerException {
         String uri = DockerApi.startPost.replace("{}", containerId);
-        FullHttpResponse response = client.post(uri, null);
-        checkResponse(response);
+        try {
+            FullHttpResponse response = client.post(uri, null);
+            checkResponse(response);
+        } catch (IOException | InterruptedException e) {
+            throw new DockerException(ExceptionUtil.errorMsg(e));
+        }
     }
 
     @Override
     public void stop(String containerId) throws DockerException {
         String uri = DockerApi.stopPost.replace("{}", containerId);
-        FullHttpResponse response = client.post(uri, null);
-        checkResponse(response);
+        try {
+            FullHttpResponse response = client.post(uri, null);
+            checkResponse(response);
+        } catch (IOException | InterruptedException e) {
+            throw new DockerException(ExceptionUtil.errorMsg(e));
+        }
     }
 
     @Override
     public void restart(String containerId) throws DockerException {
         String uri = DockerApi.restartPost.replace("{}", containerId);
-        FullHttpResponse response = client.post(uri, null);
-        checkResponse(response);
+        try {
+            FullHttpResponse response = client.post(uri, null);
+            checkResponse(response);
+        } catch (IOException | InterruptedException e) {
+            throw new DockerException(ExceptionUtil.errorMsg(e));
+        }
     }
 
     @Override
     public void rmContainer(String containerId) throws DockerException {
         String uri = DockerApi.rmContainerDelete.replace("{}", containerId);
-        FullHttpResponse response = client.delete(uri, null);
-        checkResponse(response);
+        try {
+            FullHttpResponse response = client.delete(uri, null);
+            checkResponse(response);
+        } catch (IOException | InterruptedException e) {
+            throw new DockerException(ExceptionUtil.errorMsg(e));
+        }
     }
 
     @Override
     public List<Container> ps(boolean isAll) throws DockerException {
         String uri = DockerApi.psGet + "?all=" + isAll;
-        FullHttpResponse response = client.get(uri, null);
-        String result = checkResponse(response);
-
-        JsonArrayDeserializer<Container> jsonArrayDeserializer = new JsonArrayDeserializer<>();
-        return jsonArrayDeserializer.fromJsonArray(result, Container.class);
+        try {
+            FullHttpResponse response = client.get(uri, null);
+            String result = checkResponse(response);
+            JsonArrayDeserializer<Container> jsonArrayDeserializer = new JsonArrayDeserializer<>();
+            return jsonArrayDeserializer.fromJsonArray(result, Container.class);
+        } catch (IOException | InterruptedException e) {
+            throw new DockerException(ExceptionUtil.errorMsg(e));
+        }
     }
 
     /**
@@ -312,29 +354,33 @@ public class Docker implements ImageOps, ContainerOps, AutoCloseable {
      * @date 2020-05-19 上午10:26
      */
     @Override
-    public List<String> logs(String containerId, String logLevel) {
+    public List<String> logs(String containerId, String logLevel) throws DockerException {
         LogLevel level = LogLevel.valueOf(logLevel);
         String uri = DockerApi.logsGet.replace("{}", containerId);
-        FullHttpResponse response;
-        switch (level) {
-            case info:
-                // stdout
-                String params = "?stdout=true&tail=500";
-                response = client.get(uri + params, null);
-                if (response != null) {
-                    String stdout = response.content().toString(StandardCharsets.UTF_8);
-                    return Arrays.asList(stdout.split(System.lineSeparator()));
-                }
-            case error:
-                // stderr
-                params = "?stderr=true&tail=500";
-                response = client.get(uri + params, null);
-                if (response != null) {
-                    String stderr = response.content().toString(StandardCharsets.UTF_8);
-                    return Arrays.asList(stderr.split(System.lineSeparator()));
-                }
-            default:
-                return null;
+        try {
+            FullHttpResponse response;
+            switch (level) {
+                case info:
+                    // stdout
+                    String params = "?stdout=true&tail=500";
+                    response = client.get(uri + params, null);
+                    if (response != null) {
+                        String stdout = response.content().toString(StandardCharsets.UTF_8);
+                        return Arrays.asList(stdout.split(System.lineSeparator()));
+                    }
+                case error:
+                    // stderr
+                    params = "?stderr=true&tail=500";
+                    response = client.get(uri + params, null);
+                    if (response != null) {
+                        String stderr = response.content().toString(StandardCharsets.UTF_8);
+                        return Arrays.asList(stderr.split(System.lineSeparator()));
+                    }
+                default:
+                    throw new DockerException("日志级别不存在...");
+            }
+        } catch (IOException | InterruptedException e) {
+            throw new DockerException(ExceptionUtil.errorMsg(e));
         }
     }
 
@@ -373,18 +419,23 @@ public class Docker implements ImageOps, ContainerOps, AutoCloseable {
     }
 
     @Override
-    public ContainerInspect inspectContainer(String contianerId) {
+    public ContainerInspect inspectContainer(String contianerId) throws DockerException {
         String uri = DockerApi.inspectContainerGet.replace("{}", contianerId);
-        FullHttpResponse response = client.get(uri, null);
-        if (response != null) {
-            String result =  response.content().toString(StandardCharsets.UTF_8);
-            return (ContainerInspect) JsonConverter.jsonToObject(result, ContainerInspect.class);
+        try {
+            FullHttpResponse response = client.get(uri, null);
+            if (response != null) {
+                String result =  response.content().toString(StandardCharsets.UTF_8);
+                return (ContainerInspect) JsonConverter.jsonToObject(result, ContainerInspect.class);
+            } else {
+                throw new DockerException("请求 " + uri + " 的响应是 NULL...");
+            }
+        } catch (IOException | InterruptedException e) {
+            throw new DockerException(ExceptionUtil.errorMsg(e));
         }
-        return null;
     }
 
     @Override
-    public String rootfs(String containerId) {
+    public String rootfs(String containerId) throws DockerException {
         ContainerInspect containerInspect = inspectContainer(containerId);;
         // TODO 可能抛出 null 异常
         return containerInspect.getGraphDriver().getData().getMergedDir();
@@ -393,7 +444,7 @@ public class Docker implements ImageOps, ContainerOps, AutoCloseable {
     public static void main(String[] args) {
         try (Docker docker = new Docker()) {
             docker.pull("localhost:5000/iq3x/dnkt-mgr:dfc43519");
-        } catch (Exception e) {
+        } catch (DockerException e) {
             e.printStackTrace();
         }
     }

+ 3 - 3
common/src/main/java/cn/reghao/autodop/common/dockerc/api/ContainerOps.java

@@ -41,7 +41,7 @@ public interface ContainerOps {
      * @return
      * @date 2020-05-19 上午10:57
      */
-    List<String> logs(String containerId, String logLevel);
+    List<String> logs(String containerId, String logLevel) throws DockerException;
 
     /**
      * 文件日志
@@ -59,7 +59,7 @@ public interface ContainerOps {
      * @return 容器详细信息
      * @date 2020-01-19 下午1:39
      */
-    ContainerInspect inspectContainer(String containerId);
+    ContainerInspect inspectContainer(String containerId) throws DockerException;
 
     /**
      * 容器映射到宿主机上的根目录
@@ -68,5 +68,5 @@ public interface ContainerOps {
      * @return
      * @date 2020-03-13 下午10:36
      */
-    String rootfs(String containerId);
+    String rootfs(String containerId) throws DockerException;
 }

+ 23 - 45
common/src/main/java/cn/reghao/autodop/common/dockerc/unixdomain/HttpClient.java

@@ -29,7 +29,7 @@ public class HttpClient {
         return httpResponse;
     }
 
-    public FullHttpResponse get(String uri, List<DockerHeader> headers) {
+    public FullHttpResponse get(String uri, List<DockerHeader> headers) throws IOException, InterruptedException {
         FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri);
         request.headers().set(HttpHeaderNames.HOST, "netty");
         request.headers().set(HttpHeaderNames.ACCEPT, "Accept: */*");
@@ -39,14 +39,10 @@ public class HttpClient {
             });
         }
 
-        try {
-            return unixSocketClient.syncSend(request);
-        } catch (Exception e) {
-            return null;
-        }
+        return unixSocketClient.syncSend(request);
     }
 
-    public FullHttpResponse post(String uri, List<DockerHeader> headers) {
+    public FullHttpResponse post(String uri, List<DockerHeader> headers) throws IOException, InterruptedException {
         FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uri);
         request.headers().set(HttpHeaderNames.HOST, "netty");
         request.headers().set(HttpHeaderNames.ACCEPT, "Accept: */*");
@@ -56,14 +52,10 @@ public class HttpClient {
             });
         }
 
-        try {
-            return unixSocketClient.syncSend(request);
-        } catch (Exception e) {
-            return null;
-        }
+        return unixSocketClient.syncSend(request);
     }
 
-    public FullHttpResponse postJson(String uri, List<DockerHeader> headers, String json) {
+    public FullHttpResponse postJson(String uri, List<DockerHeader> headers, String json) throws IOException, InterruptedException {
         ByteBuf byteBuf = Unpooled.copiedBuffer(json, StandardCharsets.UTF_8);
 
         FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uri);
@@ -77,35 +69,25 @@ public class HttpClient {
             });
         }
         request.content().clear().writeBytes(byteBuf);
-
-        try {
-            return unixSocketClient.syncSend(request);
-        } catch (Exception e) {
-            return null;
-        }
+        return unixSocketClient.syncSend(request);
     }
 
-    public FullHttpResponse postFile(String uri, List<DockerHeader> headers, File file) {
-        try {
-            byte[] bytes = transfer(file);
-            long len = bytes.length;
-            ByteBuf content = Unpooled.wrappedBuffer(bytes);
-
-            FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uri, content);
-            request.headers().set(HttpHeaderNames.HOST, "netty");
-            request.headers().set(HttpHeaderNames.ACCEPT, "Accept: */*");
-            request.headers().set(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(len));
-            if (headers != null) {
-                headers.forEach(header -> {
-                    request.headers().set(header.getName(), header.getValue());
-                });
-            }
-
-            return unixSocketClient.syncSend(request);
-        } catch (Exception e) {
-            e.printStackTrace();
-            return null;
+    public FullHttpResponse postFile(String uri, List<DockerHeader> headers, File file) throws IOException, InterruptedException {
+        byte[] bytes = transfer(file);
+        long len = bytes.length;
+        ByteBuf content = Unpooled.wrappedBuffer(bytes);
+
+        FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uri, content);
+        request.headers().set(HttpHeaderNames.HOST, "netty");
+        request.headers().set(HttpHeaderNames.ACCEPT, "Accept: */*");
+        request.headers().set(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(len));
+        if (headers != null) {
+            headers.forEach(header -> {
+                request.headers().set(header.getName(), header.getValue());
+            });
         }
+
+        return unixSocketClient.syncSend(request);
     }
 
     private byte[] transfer(File file) throws IOException {
@@ -121,7 +103,7 @@ public class HttpClient {
         return bos.toByteArray();
     }
 
-    public FullHttpResponse delete(String uri, List<DockerHeader> headers) {
+    public FullHttpResponse delete(String uri, List<DockerHeader> headers) throws IOException, InterruptedException {
         FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.DELETE, uri);
         request.headers().set(HttpHeaderNames.HOST, "netty");
         request.headers().set(HttpHeaderNames.ACCEPT, "Accept: */*");
@@ -131,11 +113,7 @@ public class HttpClient {
             });
         }
 
-        try {
-            return unixSocketClient.syncSend(request);
-        } catch (Exception e) {
-            return null;
-        }
+        return unixSocketClient.syncSend(request);
     }
 
     public void close() {

+ 2 - 2
common/src/main/java/cn/reghao/autodop/common/dockerc/unixdomain/UnixSocketClient.java

@@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit;
 public class UnixSocketClient {
     private static final Logger log = LoggerFactory.getLogger(UnixSocketClient.class);
     // TODO 在配置中指定
-    private final String DEFAULT_UNIX_ENDPOINT = "/var/run/docker.sock";
+    private final String DEFAULT_UNIX_ENDPOINT = "/var/run/docker1.sock";
     private EventLoopGroup group;
     private Bootstrap bootstrap;
     private DomainSocketAddress unixSock;
@@ -73,7 +73,7 @@ public class UnixSocketClient {
 
     public FullHttpResponse syncSend(FullHttpRequest request) throws InterruptedException, IOException {
         if (channel == null) {
-            throw new IOException("Unix Socket 连接失败...");
+            throw new IOException("Unix Socket 连接失败, 检查 " + DEFAULT_UNIX_ENDPOINT + " 是否存在...");
         }
 
         ChannelPromise promise = clientHandler.sendRequest(request);

+ 0 - 1
dagent/src/main/java/cn/reghao/autodop/dagent/app/App.java

@@ -33,7 +33,6 @@ public class App {
                 try {
                     dockerAppServiceImpl.deploy(appDeployArgs);
                 } catch (DockerException e) {
-                    log.error("docker 部署异常...");
                     return RpcResult.fail(ExceptionUtil.errorMsg(e));
                 }
             case zip:

+ 3 - 0
dagent/src/main/java/cn/reghao/autodop/dagent/app/ZipAppServiceImpl.java

@@ -3,6 +3,7 @@ package cn.reghao.autodop.dagent.app;
 import cn.reghao.autodop.common.dagent.app.api.data.AppStatus;
 import cn.reghao.autodop.common.dagent.app.api.data.deploy.AppDeployArgs;
 import cn.reghao.autodop.common.dagent.app.api.data.log.AppLogArgs;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -11,9 +12,11 @@ import java.util.List;
  * @author reghao
  * @date 2021-02-22 16:21:37
  */
+@Slf4j
 public class ZipAppServiceImpl implements AppService {
     @Override
     public void deploy(AppDeployArgs appDeployArgs) {
+        log.info("zip 部署...");
     }
 
     @Override

+ 5 - 5
dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/AppDispatcher.java

@@ -34,13 +34,13 @@ public class AppDispatcher {
                 log.info("返回应用状态...");
                 return app.status(payload);
             case appRestartOps:
-                log.info("返回应用状态...");
-                return app.status(payload);
+                log.info("重启应用...");
+                return app.restart(payload);
             case appStopOps:
-                log.info("返回应用状态...");
-                return app.status(payload);
+                log.info("停止应用...");
+                return app.stop(payload);
             case appStartOps:
-                log.info("返回应用状态...");
+                log.info("启动应用...");
                 return app.status(payload);
             default:
                 log.error("AppOps 中没有相应类型...");