百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术流 > 正文

Java多线程怎么实现FTP批量上传文件-Golang学习网

citgpt 2024-10-13 04:07 10 浏览 0 评论

1、构建FTP客户端

package cn.com.pingtech.common.ftp;

Java多线程怎么实现FTP批量上传文件-Golang学习网

 

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.net.ftp.FTPClient;

import org.apache.commons.net.ftp.FTPReply;

 

import java.io.*;

import java.net.UnknownHostException;

 

@Slf4j

public class  FtpConnection {

 

    private FTPClient ftp = new FTPClient();

 

    private boolean is_connected = false;

 

    /**

     * 构造函数

     */

    public FtpConnection() {

        is_connected = false;

        ftp.setDefaultTimeout(FtpConfig.defaultTimeoutSecond * 1000);

        ftp.setConnectTimeout(FtpConfig.connectTimeoutSecond * 1000);

        ftp.setDataTimeout(FtpConfig.dataTimeoutSecond * 1000);

        try {

            initConnect(FtpConfig.host, FtpConfig.port, FtpConfig.user, FtpConfig.password);

        } catch (IOException e) {

            e.printStackTrace();

        }

    }

 

    /**

     * 初始化连接

     *

     * @param host

     * @param port

     * @param user

     * @param password

     * @throws IOException

     */

    private void initConnect(String host, int port, String user, String password) throws IOException {

        try {

            ftp.connect(host, port);

        } catch (UnknownHostException ex) {

            throw new IOException("Can't find FTP server '" + host + "'");

        }

        int reply = ftp.getReplyCode();//220 连接成功

        if (!FTPReply.isPositiveCompletion(reply)) {

            disconnect();

            throw new IOException("Can't connect to server '" + host + "'");

 

        }

        if (!ftp.login(user, password)) {

            is_connected = false;

            disconnect();

            throw new IOException("Can't login to server '" + host + "'");

        } else {

            is_connected = true;

        }

    }

 

    /**

     * 上传文件

     *

     * @param path

     * @param ftpFileName

     * @param localFile

     * @throws IOException

     */

    public boolean upload(String path, String ftpFileName, File localFile) throws IOException {

        boolean is  = false;

        //检查本地文件是否存在

        if (!localFile.exists()) {

            throw new IOException("Can't upload '" + localFile.getAbsolutePath() + "'. This file doesn't exist.");

        }

        //设置工作路径

        setWorkingDirectory(path);

        //上传

        InputStream in = null;

        try {

            //被动模式

            ftp.enterLocalPassiveMode();

            in = new BufferedInputStream(new FileInputStream(localFile));

            //保存文件

            is = ftp.storeFile(ftpFileName, in);

        }catch (Exception e){

            e.printStackTrace();

        }

        finally {

            try {

                in.close();

            } catch (IOException ex) {

                ex.printStackTrace();

            }

        }

        return is;

    }

 

    /**

     * 关闭连接

     *

     * @throws IOException

     */

    public void disconnect() throws IOException {

        if (ftp.isConnected()) {

            try {

                ftp.logout();

                ftp.disconnect();

                is_connected = false;

            } catch (IOException ex) {

                ex.printStackTrace();

            }

        }

    }

 

    /**

     * 设置工作路径

     *

     * @param dir

     * @return

     */

    private boolean setWorkingDirectory(String dir) {

        if (!is_connected) {

            return false;

        }

        //如果目录不存在创建目录

        try {

            if (createDirecroty(dir)) {

                return ftp.changeWorkingDirectory(dir);

            }

        } catch (IOException e) {

            e.printStackTrace();

        }

        return false;

 

    }

 

    /**

     * 是否连接

     *

     * @return

     */

    public boolean isConnected() {

        return is_connected;

    }

 

    /**

     * 创建目录

     *

     * @param remote

     * @return

     * @throws IOException

     */

    private boolean createDirecroty(String remote) throws IOException {

        boolean success = true;

        String directory = remote.substring(0, remote.lastIndexOf("/") + 1);

        // 如果远程目录不存在,则递归创建远程服务器目录

        if (!directory.equalsIgnoreCase("/") && !ftp.changeWorkingDirectory(new String(directory))) {

            int start = 0;

            int end = 0;

            if (directory.startsWith("/")) {

                start = 1;

            } else {

                start = 0;

            }

            end = directory.indexOf("/", start);

            while (true) {

                String subDirectory = new String(remote.substring(start, end));

                if (!ftp.changeWorkingDirectory(subDirectory)) {

                    if (ftp.makeDirectory(subDirectory)) {

                        ftp.changeWorkingDirectory(subDirectory);

                    } else {

                        log.error("mack directory error :/" + subDirectory);

                        return false;

                    }

                }

                start = end + 1;

                end = directory.indexOf("/", start);

                // 检查所有目录是否创建完毕

                if (end <= start) {

                    break;

                }

            }

        }

        return success;

    }

 

}

2、FTP连接工厂

package cn.com.pingtech.common.ftp;

 

import lombok.extern.slf4j.Slf4j;

 

import java.io.IOException;

import java.util.concurrent.ArrayBlockingQueue;

 

 

/**

 * 连接工厂

 

 */

@Slf4j

public class FtpFactory {

 

    //有界队列

    private static final ArrayBlockingQueue<FtpConnection> arrayBlockingQueue = new ArrayBlockingQueue<>(FtpConfig.ftpConnectionSize);

 

 

    protected FtpFactory(){

        log.info("init ftpConnectionSize "+FtpConfig.ftpConnectionSize);

        for(int i = 0; i< FtpConfig.ftpConnectionSize; i++){

            //表示如果可能的话,将 e 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则返回 false

            arrayBlockingQueue.offer(new FtpConnection());

        }

    }

 

    /**

     * 获取连接

     *

     * @return

     */

 

    public FtpConnection getFtp() {

        FtpConnection poll = null;

        try {

            //取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 Blocking 有新的对象被加入为止

            poll = arrayBlockingQueue.take();

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        return poll;

    }

 

    /**

     * 释放连接

     * @param ftp

     * @return

     */

    public boolean relase(FtpConnection ftp){

        return arrayBlockingQueue.offer(ftp);

    }

 

    /**

     * 删除连接

     *

     * @param ftp

     */

 

    public void remove(FtpConnection ftp) {

        arrayBlockingQueue.remove(ftp);

    }

 

    /**

     * 关闭连接

     */

    public void close() {

        for (FtpConnection connection : arrayBlockingQueue) {

            try {

                connection.disconnect();

            } catch (IOException e) {

                e.printStackTrace();

            }

        }

    }

 

 

}

3、FTP配置

package cn.com.pingtech.common.ftp;

 

/**

 * ftp 配置类

 */

 

public class FtpConfig {

 

    public static int defaultTimeoutSecond = 10;

    public static int connectTimeoutSecond = 10;

    public static int dataTimeoutSecond = 10;

    public static String host = "127.0.0.1";

    public static int port =9999;

    public static String user = "Administrator";

    public static String password ="Yp886611";

    public static int threadPoolSize = 1;

    public static int ftpConnectionSize = 1;

     

}

4、构建多线程FTP上传任务

package cn.com.pingtech.common.ftp;

 

import java.io.File;

import java.io.IOException;

import java.util.concurrent.Callable;

 

 

/**

 * 上传任务

 */

public class UploadTask implements Callable{

    private File file;

 

    private FtpConnection ftp;

 

    private String path;

 

    private String fileName;

 

    private FtpFactory factory;

 

    public UploadTask(FtpFactory factory,FtpConnection ftp, File file, String path, String fileName){

 

        this.factory = factory;

 

        this.ftp = ftp;

 

        this.file = file;

 

        this.path = path;

 

        this.fileName = fileName;

 

    }

 

    @Override

    public UploadResult call() throws Exception {

        UploadResult result = null;

        try {

            if (ftp == null) {

                result = new UploadResult(file.getAbsolutePath(), false);

                return result;

            }

            //如果连接未开启 重新获取连接

            if (!ftp.isConnected()) {

                factory.remove(ftp);

                ftp = new FtpConnection();

            }

 

            //开始上传

            result = new UploadResult(file.getName(), ftp.upload(path, fileName, file));

        } catch (IOException ex) {

            result = new UploadResult(file.getName(), false);

            ex.printStackTrace();

        } finally {

            factory.relase(ftp);//释放连接

        }

        return result;

 

    }

}

package cn.com.pingtech.common.ftp;

/**

 * 上传结果

 */

public class UploadResult {

    private String fileName; //文件名称

    private boolean result; //是否上传成功

 

    public UploadResult(String fileName, boolean result) {

        this.fileName = fileName;

        this.result = result;

    }

 

    public String getFileName() {

        return fileName;

 

    }

 

    public void setFileName(String fileName) {

        this.fileName = fileName;

    }

 

    public boolean isResult() {

        return result;

    }

 

    public void setResult(boolean result) {

        this.result = result;

    }

 

    public String toString() {

        return "[fileName=" + fileName + " , result=" + result + "]";

    }

}

注意:实现Callable接口的任务线程能返回执行结果

Callable接口支持返回执行结果,此时需要调用FutureTask.get()方法实现,此方法会阻塞线程直到获取“将来”的结果,当不调用此方法时,主线程不会阻塞


5、FTP上传工具类

package cn.com.pingtech.common.ftp;

 

import java.io.File;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

import java.util.concurrent.TimeUnit;

 

 

/**

 * ftp上传工具包

 */

 

public class FtpUtil {

 

    /**

     * 上传文件

     *

     * @param ftpPath

     * @param listFiles

     * @return

     */

 

    public static synchronized List upload(String ftpPath, File[] listFiles) {

        //构建线程池

        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(FtpConfig.threadPoolSize);

        List<Future> results = new ArrayList<>();

        //创建n个ftp链接

        FtpFactory factory = new FtpFactory();

        for (File file : listFiles) {

            FtpConnection ftp = factory.getFtp();//获取ftp con

            UploadTask upload = new UploadTask(factory,ftp, file, ftpPath, file.getName());

            Future submit = newFixedThreadPool.submit(upload);

            results.add(submit);

        }

 

        List listResults = new ArrayList<>();

        for (Future result : results) {

            try {

                //获取线程结果

                UploadResult uploadResult = (UploadResult)result.get(30, TimeUnit.MINUTES);

                listResults.add(uploadResult);

            } catch (Exception e) {

                e.printStackTrace();

            }

        }

        factory.close();

        newFixedThreadPool.shutdown();

        return listResults;

    }

 

}

6、测试上传

package cn.com.pingtech.common.ftp

 

 

class Client {

    public static void main(String[] args) throws IOException {

        String loalPath = "C:\\Users\\Administrator\\Desktop\\test\\0";

        String ftpPath = "/data/jcz/";

        File parentFile = new File(loalPath);

        List <UploadResult> list = FtpUtil.upload(ftpPath,parentFile.listFiles());

        for(UploadResult vo:list){

            System.out.println(vo);

        }

         

    }

}

注意:FTP协议里面,规定文件名编码为iso-8859-1,所以目录名或文件名需要转码


相关推荐

js中arguments详解

一、简介了解arguments这个对象之前先来认识一下javascript的一些功能:其实Javascript并没有重载函数的功能,但是Arguments对象能够模拟重载。Javascrip中每个函数...

firewall-cmd 常用命令

目录firewalldzone说明firewallzone内容说明firewall-cmd常用参数firewall-cmd常用命令常用命令 回到顶部firewalldzone...

epel-release 是什么

EPEL-release(ExtraPackagesforEnterpriseLinux)是一个软件仓库,它为企业级Linux发行版(如CentOS、RHEL等)提供额外的软件包。以下是关于E...

FullGC详解  什么是 JVM 的 GC
FullGC详解 什么是 JVM 的 GC

前言:背景:一、什么是JVM的GC?JVM(JavaVirtualMachine)。JVM是Java程序的虚拟机,是一种实现Java语言的解...

2024-10-26 08:50 citgpt

使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
跨域(CrossOrigin)

1.介绍  1)跨域问题:跨域问题是在网络中,当一个网络的运行脚本(通常时JavaScript)试图访问另一个网络的资源时,如果这两个网络的端口、协议和域名不一致时就会出现跨域问题。    通俗讲...

微服务架构和分布式架构的区别

1、含义不同微服务架构:微服务架构风格是一种将一个单一应用程序开发为一组小型服务的方法,每个服务运行在自己的进程中,服务间通信采用轻量级通信机制(通常用HTTP资源API)。这些服务围绕业务能力构建并...

深入理解与应用CSS clip-path 属性
深入理解与应用CSS clip-path 属性

clip-pathclip-path是什么clip-path 是一个CSS属性,允许开发者创建一个剪切区域,从而决定元素的哪些部分可见,哪些部分会被隐...

2024-10-25 11:51 citgpt

HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
Request.ServerVariables 大全

Request.ServerVariables("Url")返回服务器地址Request.ServerVariables("Path_Info")客户端提供的路...

python操作Kafka

目录一、python操作kafka1.python使用kafka生产者2.python使用kafka消费者3.使用docker中的kafka二、python操作kafka细...

Runtime.getRuntime().exec详解

Runtime.getRuntime().exec详解概述Runtime.getRuntime().exec用于调用外部可执行程序或系统命令,并重定向外部程序的标准输入、标准输出和标准错误到缓冲池。...

promise.all详解 promise.all是干什么的
promise.all详解 promise.all是干什么的

promise.all详解promise.all中所有的请求成功了,走.then(),在.then()中能得到一个数组,数组中是每个请求resolve抛出的结果...

2024-10-24 16:21 citgpt

Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解

取消回复欢迎 发表评论: