2
回答
使用 Spring Batch 和 SFTP 安全的发送数据
华为云实践训练营,热门技术免费实践!>>>   

使用 Spring Batch 发送数据有很多中方法,最常见的就是使用 XML 文件来发送,但该框架还提供很多其他的方式,其中包括通过 SFTP 服务器发送数据。

首先,和其他方式一样,我们需要创建一个 “Reader” 和 “Writer”.

Reader:

<bean id="sftpFileReader">
  <property name="dataSource" ref="dataSource" />
  <property name="sql" value="SELECT * FROM table" />
  <property name="rowMapper">
    <bean class="de.package.rowmapper.SftpFileRowMapper" />
  </property>
</bean>

Reader 所使用的 RowMapper

package de.package.rowmapper;
 
import java.sql.ResultSet;
import java.sql.SQLException;
 
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jdbc.core.RowMapper;
 
import de.package.domainObjects.SftpFileObject;
 
public class SftpFileRowMapper implements RowMapper {
 
    public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
        SftpFileObject fileLine = new SftpFileObject();
        try {
            fileLine.setDbField1(rs.getString("dbField1"));
            fileLine.setDbField2(rs.getString("dbField2"));
            fileLine.setDbField3(rs.getString("dbField3"));
            fileLine.setDbField4(rs.getString("dbField4"));
            fileLine.setDbField5(rs.getString("dbField5"));
        } catch (SQLException e) {
            System.out.println("Can't create data row for export File.");
        }
        return fileLine;
    }
}

Reader 所使用的 DomainObject 类

package de.package.domainObjects;
 
public class SftpFileObject implements java.io.Serializable {
 
    private static final long serialVersionUID = 1L;
 
    public String dbField1;
    public String dbField2;
    public String dbField3;
    public String dbField4;
    public String dbField5;
 
    public String getDbField1() {
        return dbField1;
    }
 
    public void setDbField1(String dbField1) {
        this.dbField1= dbField1;
    }
 
    public String getDbField2() {
        return dbField2;
    }
 
    public void setDbField2(String dbField2) {
        this.dbField2= dbField2;
    }
 
    public String getDbField3() {
        return dbField3;
    }
 
    public void setDbField3(String dbField3) {
        this.dbField3= dbField3;
    }
 
    public String getDbField4() {
        return dbField4;
    }
 
    public void setDbField4(String dbField4) {
        this.dbField4= dbField4;
    }
 
    public String getDbField5() {
        return dbField5;
    }
 
    public void setDbField5(String dbField5) {
        this.dbField5= dbField5;
    }
 
}

Writer:

<bean id="sftpFileWriter" scope="step">
  <property name="resource" value="file:path/to/file/filename.csv" />
  <property name="encoding" value="ISO-8859-1" />
  <property name="headerCallback">
    <bean class="de.package.helper.HeaderCallback" />
  </property>
  <property name="lineAggregator">
    <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
      <property name="delimiter" value=";" />
      <property name="fieldExtractor">
        <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
          <property name="names" value="dbField1, dbField2, dbField3, dbField4, dbField5" />
        </bean>
      </property>
    </bean>
  </property>
</bean>

接下来只需要 HeaderCallback :

package de.package.helper;
 
import java.io.IOException;
import java.io.Writer;
import org.springframework.batch.item.file.FlatFileHeaderCallback;
 
public class HeaderCallback implements FlatFileHeaderCallback {
 
    @Override
    public void writeHeader(Writer writer) throws IOException {
        writer.write("FieldHeadline1;FieldHeadline2;FieldHeadline3;FieldHeadline4;FieldHeadline5");
    }
}

如果要发送文件到 SFTP 服务器,我们需要一个 SftpSessionFactory 类,包含访问服务器的信息:

<bean id="sftpSessionFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <property name="host" value="host.of.receiver"/>
    <property name="user" value="username"/>
    <property name="password" value="secureSftpPassword"/>
    <property name="port" value="22"/>
</bean>

然后我们需要定义一个 channel 用来发送数据:

<int:channel id="outputChannel" />

紧接着我们需要告诉 Spring Batch 发送的方法

<int-sftp:outbound-channel-adapter 
    id="sftpOutboundAdapter" 
    session-factory="sftpSessionFactory" 
    channel="outputChannel" 
    charset="UTF-8" 
    remote-directory="/target" 
    remote-filename-generator="fileNameGenerator" />

为了简便,服务器上存储的文件名跟本地一致,因此需要定义 DefaultFilenameGenerator

<bean id="fileNameGenerator"
class="org.springframework.integration.file.DefaultFileNameGenerator" />
要真正的发送文件,我们还需要一个 Tasklet 和 BatchJob :
<bean id="sftpJobTasklet" class="package de.package.tasklets.SftpTasklet">
  <property name="fileName" value=" path/to/file/filename.csv" />
  <property name="sftpChannel" ref="outputChannel" />
</bean>

 

接下来真正发送文件的代码就很简单了:

package de.package.tasklets;
 
import java.io.File;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.support.MessageBuilder;
 
public class SftpTasklet implements Tasklet {
 
    private String fileName;
    private MessageChannel sftpChannel;
 
    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
 
        File file = new File(fileName);
 
        if (file.exists()) {
            Message&lt;File&gt; message = MessageBuilder.withPayload(file).build();
            try {
                sftpChannel.send(message);
            } catch (Exception e) {
                System.out.println("Could not send file per SFTP: " + e);
            }
        } else {
            System.out.println("File does not exist.");
        }
 
    return RepeatStatus.FINISHED;
  }
 
  public String getFileName() {
    return fileName;
  }
 
  public void setFileName(String fileName) {
    this.fileName = fileName;
  }
 
  public MessageChannel getSftpChannel() {
    return sftpChannel;
  }
 
  public void setSftpChannel(MessageChannel sftpChannel) {
    this.sftpChannel = sftpChannel;
  }
 
}

然后定义 BatchJob:

<batch:job id="sftpJob" restartable="false">
  <batch:step id="sftpFileGenerateStep" next="sftpFileSendStep">
    <batch:tasklet>
      <batch:chunk reader="sftpFileCreator" writer="sftpFileWriter" commit-interval="100" />
      <batch:listeners>
        <batch:listener ref="fileNameListener" />
      </batch:listeners>
    </batch:tasklet>
  </batch:step>
  <batch:step id="sftpFileSendStep">
    <batch:tasklet ref="sftpJobTasklet" />
  </batch:step>
</batch:job>
BatchJob 可通过服务器上的 CronJob 来启动,我们需要在 “applicationContext.xml” 增加如下内容:
<xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file-2.0.xsd
http://www.springframework.org/schema/integration/sftp http://www.springframework.org/schema/integration/sftp/spring-integration-sftp-2.0.xsd
http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-2.0.xsd">
举报
鉴客
发帖于6年前 2回/1K+阅
顶部