メインコンテンツまでスキップ
Version: 3.4.4

開発者ガイド

文書では、BXI File Agent Serverで定義して使用されているProtocolと、提供する詳細コマンドを説明する。そして、サイト適用プロセスでカスタマイズが必要な場合に参考にできる情報を提供する。

エージェントサーバー運用のために必要な情報は、下記の文書を参照する。

Operating documentation : User Guide

Frame Message Protocol

Agent Serverの要求と応答メッセージは、下記のように、構成される。

Protocol Layout

PartElementsDescription
COMMAND{GETPUT
DEFINED HEADERAgent、Remote、Agent-Type、Response-Code、Reason、Connection、Content-Length、Deleted-Count、Transfer-Encoding、Transfer-Source-Uri、Transfer-Destination-Uri、Destination-Agent、Transfer-Validation、Transfer-Timeout-Seconds、Transfer-Before-Script、Transfer-After-Script、Transfer-Interceptor、Merge-Resource、Session-Id、Resource-Length、Transferred-Resource要求処理と応答に必要な情報のために、予め定義されているヘッダー項目
CUSTOM HEADERex) Request-AgentUserカスタマイズされる前後処理機に伝達されて使用するユーザー定義ヘッダー項目で、任意の名前と値として追加されることがある。
CONTENTbytes contents送信/受信Contents

COMMANDS

  • INFO /health : Agent ServerにHealth情報を要求
  • INFO /info : Agent Serverにシステム情報を要求ト
  • INFO /exist : Agent Serverに特定のリソースが存在するか確認要求
  • ACTION /session : 1つのトランザクション(ex)大型リソース分割送信が順次、または並列に交換される複数の要求と応答で処理する必要がある場合、完全な処理を保証するために使用されるsessionの作成をAgent Serverに要求
  • ACTION /merge : 並列に分割送信された大型リソースのマージ要求
  • ACTION /shutdown : Agent Server Shutdown要求
  • GET : Agent Serverに特定リソースの送信を要求
  • PUT : Agent Serverに特定リソースを送信
  • LIST : Agent Serverの特定位置に存在するリソースリストを要求
  • DELETE : Agent Serverの特定位置に存在するリソース削除を要求
  • TRANSFER : Agent Serverの特定リソース別のAgent Server Instanceに送信するように要求

DEFINED HEADERS

下記では、ユーザーによって指定されたり理解する必要がある主なヘッダー項目を説明する。

Header ElementDescriptionExamples
Response-Code応答にのみ含まれ、要求に対する処理結果200(OK)、210(Not Exist)、400(Bad Request)、402(Bad Response)、404(Source File Not Found)、405(Already Exist)、500(Internal Server Error)、510(Destination Not Responding)、520(Transfer Failed)、525(Delete Failed)、530(File Permission Error)、540(TimeOut Occurred)
Reasonエラーに対する結果、または正常処理時に一部の情報を含むResponse-Code : 520 Transfer Failed、Reason : agent : [192.168.12.15:8024, 192.168.12.16:8024] are not responding
Content-Length伝達されるCONTENT領域のサイズContent-Length : 247549
Transfer-EncodingChunk単位で送信するかどうかTransfer-Encoding : chunked
Content-Length伝達されるCONTENT領域のサイズContent-Length : 247549
Resource-LengthEXIST command要求の結果でリソースが存在する場合、当該リソースの長さ情報Reason : EXIST、Resource-Length : 557
Deleted-CountDELETE command要求による処理結果Deleted-Count : 2、Reason : /agent/backup/20180717/check1.done、Reason : /agent/backup/20180717/check2.done
Transferred-ResourceTRANSFER command要求によってsource agentからtarget agentに送信された結果Reason : 2 files are transferred、0 files are failed、Transferred-Resource : /agent/2432345.dat、Transferred-Resource : /agent/32453.dat
Transfer-Source-Uriリソースを送信したり、送信を要求する際に対象となるリソースとパラメータで構成されるURITransfer-Source-Uri : agent://192.168.12.15:8025/fixed-content.txt?interceptor=simpleCustomTransferInterceptor\&afterTransfer=backup
Transfer-Destination-Uriリソースを送信したり、送信を要求する際に受信位置でリソースを保存する位置とパラメータで構成されるURITransfer-Destination-Uri : /backup/${date:now:yyyyMMdd}/${file:name.noext}.bak?interceptor=simpleCustomReceiveInterceptor\&createAck=.done\&onExist=overwriteOnExist
Transfer-Interceptor、Transfer-Before-Script、Transfer-After-ScriptTRANSFER commandと共に伝達される前後処理項目として処理を行うbean登録名をinterceptorに指定するか、python scriptの位置をscript項目に指定する。URIにParameterとして含まれている前後処理機は、送受信Agentの位置で送信と受信の前後に実行され、ヘッダー項目に含まれている前後処理指定はTRANSFER command処理前後に実行される。Transfer-Before-Script : replace.py arg1 arg2、Transfer-Interceptor : simpleCustomTransferInterceptor
Connection要求側にConnectionをcloseするかどうかで、closingは自動処理されるResponse-Code : 520、Transfer Failed、Reason : agent : [192.168.12.15:8024, 192.168.12.16:8024] are not responding、Connection : close

URI(Uniform Resource Identifier)を利用したパスとリソースを指定

URI構成

パスとリソースを指定するために、Header項目(Transfer-Source-Uri、Transfer-Destination-Uri...)にURIを指定する。 URIには送信前後処理(backup、delete、interceptor実行)などのオプションを指定するためのparameterを含む。

ex) agent://192.168.219.141:8024/9063C6480000.dat?&createAck=.done&onExist=overwriteOnExist

リソースのリスト要求と削除コマンドに使用されるURIには、ant style pattern文字列が指定できる。

ex) agent://192.168.219.141:8024/backup/**/*

パスURIには、下記の項目がパラメータに含まれることができる。

NameDescriptionExamples
site業務別にsiteを分離して運用中の場合、処理するリソースのsite Idsite=accountMgmt
createAckリソースを受信するAgentで受信完了ファイルを作成するかどうかと拡張子createAck=.done
afterTransferリソース送信Agentで送信完了後の処理afterTransfer=backup、afterTransfer=delete
beforeScriptcommand処理前に実行されるpython scriptbeforeScript=update.py arg1 arg2 arg3
afterScriptcommand処理後に実行されるpython scriptafterScript=clear.py arg1 arg2 arg3
interceptorcommand処理前後に実行される前後処理機Beanの登録名interceptor=simpleCustomReceiveInterceptor
onExist送信対象ファイルが受信Agentに存在する場合の処理方法onExist=overwriteOnExist、onExist=appendOnExist、onExist=failOnExist

実際のアプリケーションレベルでURIはユーザーによって直接作成されず、次のAPIを利用してエンコーディングされて作成される。

String uri= AgentMessageUtil.encodedUri( "192.168.42.24", 8025, "d324234.dat",
OptionParam.param( OptionParam.INTERCEPTOR, "simpleCustomTransferInterceptor"),
OptionParam.param( OptionParam.INTERCEPTOR, "simpleCustomTransferInterceptor"),
OptionParam.param( OptionParam.SITE, "accountMgmt"));

PATH Expression

パスURIを作成する際には、ダイナミックに作成する必要がある一部文字列を下記の例のように、expressionを利用して指定することができる。

example)

agent://192.168.219.141:8024/${property[agent.repository.sites.accountMgmt.backup-dir]}/${date:now-48h:yyyyMMdd}/${file:name.noext}.bak

evaluated : agent://192.168.219.141:8024/agent/accounts/backup/20180718/d3246.bak
..

agent://192.168.219.141:8024/${ref:staticCustomEvaluator}/${date:now-48h:yyyyMMdd}/${file:name.noext}.bak

evaluated : agent://192.168.219.141:8024/agent/custom_biz/toTransfer/20180718/d3246.bak

expression処理結果に代替される必要がある部分は、${で始まり}で完了する。

Agent Serverでは、次のexpression functionを提供する。

  • property : システム環境変数、JVMに伝達された実行因数、application.ymlに設定されたすべての項目値に代替
  • date : ${date:command:pattern}の形式で使用される。
    • command : 現在時間を読み込むnowと、これに接続される演算子に対応する。 ex) now-48h+60m-100s
    • pattern : 文字列に変換する際に、使用するパターン文字列を指定する。
  • file : ファイル名を基準に代替する文字列を作成する。処理するオリジナルのファイル名が存在しなければならないため、受信URIにのみ指定できる。
    • file:name : パスを除去したファイル名
    • file:name.noext : パスと拡張子を除去したファイル名
    • file:ext | file:name.ext : ファイルの拡張子
    • file:parent | file:path : ファイルパス
    • file:absolute.path : ファイルの絶対パスでparent、pathと同じ値に変換される。
    • file:length | file.size : ファイルのサイズ
    • file:modified : ファイルのlastModified long value
  • ref : config/custom-context.xmlに登録されているbxi.agent.file.language.simple.Expression typeのbeanを呼び出した結果の文字表現に変換される。bean呼び出し時に伝達されるExpressionContext isntanceには、Spring ApplicationContext instanceとRequest Headerに伝達されたすべての項目、そして現在処理中のファイルがある場合、ファイルの絶対パス情報が含まれている。

下記は参考用に作成され、配布パッケージに含まれているcustom evaluatorの実装である。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="staticCustomEvaluator" class="bxi.agent.file.sample.StaticCustomEvaluator">
<property name="staticValue" value="custom"/>
</bean>

</beans>
package bxi.agent.file.sample;

import java.util.List;

import bxi.agent.file.language.simple.Expression;
import bxi.agent.file.language.simple.ExpressionContext;
import bxi.agent.file.protocol.AgentHeaderNames;

public class StaticCustomEvaluator implements Expression
{
private String staticValue;

@SuppressWarnings( "unchecked")
@Override
public <T> T evaluate( ExpressionContext context, Class<T> type)
{
Object another= context.getApplicationContext().getBean( "another");
String path= context.getCurrentHandlingFilePath();
String contentLength= context.getHeader( AgentHeaderNames.CONTENT_LENGTH);
List<String> scripts= context.getHeaders().getAll( AgentHeaderNames.TRANSFER_AFTER_SCRIPT);

return (T)staticValue;
}

/**
* @return the staticValue
*/
public String getStaticValue()
{
return staticValue;
}

/**
* @param staticValue the staticValue to set
*/
public void setStaticValue( String staticValue)
{
this.staticValue= staticValue;
}

}


Agent Client

実際に配布されているAgent Serverを使用するアプリケーションでは、上記に説明したヘッダー項目をすべて明示的に指定して提供される機能を使用する必要はなく、より円滑に機能を使用できるようにClient API Setを提供する。

クライアント配布パッケージはdist/bxi-file-agent-1.0.1-client.zipで提供され、実行のために必要な依存ライブラリが含まれている。

下記に提供される主なAPIについての説明と使用例である。

simple example


// client作成後、接続を作成し接続を再使用する場合
AgentClient client= AgentClient.create( "192.168.219.141", 8024);
Channel channel= null;

try
{
channel= client.connect();

Health health= client.requestHealth( channel);
if( Status.UP.equals( health.getStatus()))
{
boolean answer= client.requestResourceExist( channel,
"/${date:now:yyyyMMdd}/parallel-content.jar", null);
logger.info( "result: {}", answer);
}
}
catch( Exception e)
{
logger.error( e.getMessage(), e);
}
finally
{
if( channel!= null) channel.close();
client.shutdown();
}

AgentClient client= AgentClient.create( "192.168.219.141", 8024);
try
{
boolean answer= client.requestResourceExist( AgentClient.THROWAWAY,
"/${date:now:yyyyMMdd}/parallel-content.jar", null);
logger.info( "result: {}", answer);
}
catch( Exception e)
{
logger.error( e.getMessage(), e);
}
finally
{
client.shutdown();
}

Agent Client APIs


// 1. Agent Serverの実行環境情報を要求
public Map<String, String> requestServerInfo( Channel channel) throws Exception

public Map<String, String> requestServerInfo( Channel channel, long timeout) throws Exception

// 2. Agent ServerのHealth状態を確認
public Health requestHealth( Channel channel) throws Exception

public Health requestHealth( Channel channel, long timeout) throws Exception

// 3. 特定のリソースがAgent Serverの特定の位置に存在するか確認
public boolean requestResourceExist( Channel channel, String path, String site) throws Exception

public boolean requestResourceExist( Channel channel, String path, String site, long timeout) throws Exception

// 4. Agent Serverから特定のリソースを要求して応答を処理
//Functional Interfaceを利用して応答コンテンツへの処理を作成する。
public <R> R requestGetResource( Channel channel, String path, String site,
Function<AgentFileData, R> operator, OptionParam... options) throws Exception

public <R> R requestGetResource( Channel channel, String path, String site,
Function<AgentFileData, R> operator, long timeout, OptionParam... options) throws Exception

// 以下は単純な使用例である。
AgentClient client= AgentClient.create( "192.168.219.141", 8024);

try
{
boolean answer= client.requestGetResource( AgentClient.THROWAWAY, "/${date:now:yyyyMMdd}/parallel-content.jar",
null, content->{
try
{
content.renameTo( new File( "/DATA/easymaster/Workings/systemd-agent/bxi-file-agent-1.0.1/get.jar"));
return true;
}
catch( IOException e)
{
return false;
}
}, new OptionParam[] {});
logger.info( "result: {}", answer);
}
catch( Exception e)
{
logger.error( e.getMessage(), e);
}
finally
{
client.shutdown();
}

// 5. 特定のリソースをAgent Serverから削除する。
//処理結果として正常に削除されたリソースリストをインポートする。.
public List<String> requestDeleteResources( Channel channel, String path, String site, OptionParam... options)
throws Exception

public List<String> requestDeleteResources( Channel channel, String path, String site,
long timeout, OptionParam... options) throws Exception

// 6. パスとパターン文字列を利用してAgent Serverからリソースのリストをインポートする。

public List<String> requestListResources( Channel channel, String path, String site, OptionParam... options)
throws Exception

public List<String> requestListResources( Channel channel, String path, String site,
long timeout, OptionParam... options) throws Exception

// 7. Agent Serverに特定ファイルを送信して保存するようにする。
public boolean requestPutResource( Channel channel,
File resource, String path, String site, OptionParam... options) throws Exception

public boolean requestPutResource( Channel channel, File resource, String path, String site,
long timeout, OptionParam... options) throws Exception

// 8. 大型ファイルを並列に処理してAgent Serverに送信する。
public boolean requestPutParallelResource( File resource, String path, String site, OptionParam... options)
throws Exception

// 並列処理でファイルを送信す時には、同時処理のために次のように、AgentClient作成時に適切なworker countを保存する必要がある。
AgentClient client= AgentClient.create( "192.168.219.141", 8028, 10);
try
{
File resource= ResourceUtils.getFile( "./src/test/resources/parallel-content.jar");
boolean answer= client.requestPutParallelResource( resource,
"/backup/${date:now:yyyyMMdd}/${file:name.noext}.bak", "biz1",
OptionParam.param( OptionParam.ON_EXIST, OptionParamValues.OVERWRITE_ONEXIST),
OptionParam.param( OptionParam.CREATE_ACK, ".ack"),
OptionParam.param( OptionParam.INTERCEPTOR, "simpleCustomReceiveInterceptor"));

logger.info( "result: {}", answer);
}
catch( Exception e)
{
logger.error( e.getMessage(), e);
}
finally
{
client.shutdown();
}

// 9. 特定Agent ServerにTransfer要求を送信して別のターゲットAgent Serverにリソースを転送するようにする。
public boolean requestTransferResources( Channel channel, List<TransferRequest> trans,
boolean sync, OptionParam... options) throws Exception

public boolean requestTransferResources( Channel channel, List<TransferRequest> trans,
boolean sync, long timeout, OptionParam... options) throws Exception

AgentClient client= AgentClient.create( "192.168.219.141", 8028); // 送信Agent Serverに接続

try
{
List<TransferRequest> trans= new ArrayList<TransferRequest>(); // 送信対象を複数個保存できる。
trans.add( new TransferRequest()
.from( client)
.resource( "/backup/${date:now:yyyyMMdd}/parallel-content.large", OptionParam.param( OptionParam.SITE, "biz1"))
.to( "192.168.219.141", 8024)
.to( "192.168.219.142", 8024) // 受信Agent Serverが複数個指定された場合、1番目のAgent Serverが応答しない場合は2番目のAgent Serverに送信を試みる。
.path( "${date:now:yyyyMMdd}/${file:name.noext}.jar",
OptionParam.param( OptionParam.INTERCEPTOR, "simpleCustomReceiveInterceptor"),
OptionParam.param( OptionParam.CREATE_ACK, ".done"),
OptionParam.param( OptionParam.ON_EXIST, OptionParamValues.OVERWRITE_ONEXIST)));

boolean answer= client.requestTransferResources( AgentClient.THROWAWAY, trans, true, -1,
OptionParam.param( OptionParam.INTERCEPTOR, "simpleCustomTransferInterceptor"));
logger.info( "result: {}", answer);
}
catch( Exception e)
{
logger.error( e.getMessage(), e);
}
finally
{
client.shutdown();
}

// 10.リモートAgent Serverをshutdownさせる。
public boolean requestShutdownCommand( Channel channel) throws Exception

// 11. 汎用的に使用できるように、ResponseComsumer Functional Interfaceを因子で受け取るAPI
// ResponseConsumer Functionで応答を伝達されたAgentMessageを処理できる。
public <R> R request( Channel channel, AgentMessage request, ResponseConsumer<AgentMessage, R> consumer)
throws RequestHandlerException, ResponseHandlerException

public <R> R request( final Channel channel, AgentMessage request, ResponseConsumer<AgentMessage, R> consumer,
long timeout) throws RequestHandlerException, ResponseHandlerException


User Defined Beans

System Interceptors Agent ServerにはServer Instance別に設定され、すべての要求前後に必ず実行される前後処理Interceptorを指定することができる。

下記は参考用に作成され、配布パッケージに含まれているcustom system interceptorの実装である。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="agentInterceptor" class="bxi.agent.file.sample.SimpleAgentInterceptor"/>

</beans>

package bxi.agent.file.sample;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import bxi.agent.file.processor.AgentInterceptor;
import bxi.agent.file.processor.TransferContext;

public class SimpleAgentInterceptor implements AgentInterceptor
{
private Logger logger= LoggerFactory.getLogger( SimpleAgentInterceptor.class);

@Override
public boolean preProcess( TransferContext context) throws Exception
{
logger.debug( "{} interceptor preProcess ........................... executed", getClass());
return true;
}

@Override
public void postProcess( TransferContext context) throws Exception
{
logger.debug( "{} interceptor postProcess ........................... executed", getClass());
}

@Override
public void afterCompletion( TransferContext context) throws Exception
{
logger.debug( "{} interceptor afterCompletion ........................... executed", getClass());
}
}

Request Interceptors

また、要求ごとにヘッダー(Transfer-Interceptor : TRANSFER command時に要求処理の前後に実行)に指定されるか、URI Parameter(interceptor=simpleCustomTransferInterceptor、送信URIに指定された場合は送信前後に実行され、受信URIに指定された場合は受信前後に実行される)に1つ以上指定して実行される。

bean呼び出し時に伝達されるTransferContext instanceには、Spring ApplicationContext instanceとRequest Headerに伝達されたすべての項目、そして現在処理中のファイルがある場合、ファイルの絶対パス情報が含まれている。

下記は参考用に作成され、配布パッケージに含まれているcustom request interceptorの実装である。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

<!-- 送信前後に実行されるInterceptor -->
<bean id="simpleCustomTransferInterceptor" class="bxi.agent.file.sample.SimpleCustomTransferInterceptor"/>

<!-- 受信前後に実行されるInterceptor -->
<bean id="simpleCustomReceiveInterceptor" class="bxi.agent.file.sample.SimpleCustomReceiveInterceptor"/>

</beans>

package bxi.agent.file.sample;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import bxi.agent.file.processor.TransferContext;
import bxi.agent.file.processor.TransferInterceptor;

public class SimpleCustomTransferInterceptor implements TransferInterceptor
{
private Logger logger= LoggerFactory.getLogger( SimpleCustomTransferInterceptor.class);

@Override
public boolean preTransfer( TransferContext context) throws Exception
{
logger.debug( "custom interceptoer {} preTransfer", SimpleCustomTransferInterceptor.class);
// check if required
return true;
}

@Override
public void postTransfer( TransferContext context, Exception cause) throws Exception
{
logger.debug( "custom interceptoer {} postTransfer", SimpleCustomTransferInterceptor.class);
// process if required
}

@Override
public void afterCompletion( TransferContext context) throws Exception
{
logger.debug( "custom interceptoer {} afterCompletion", SimpleCustomTransferInterceptor.class);
// process if required
}
}


package bxi.agent.file.sample;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import bxi.agent.file.processor.ReceiveInterceptor;
import bxi.agent.file.processor.TransferContext;

public class SimpleCustomReceiveInterceptor implements ReceiveInterceptor
{
private Logger logger= LoggerFactory.getLogger( SimpleCustomReceiveInterceptor.class);

@Override
public boolean preReceive( TransferContext context) throws Exception
{
logger.debug( "custom interceptoer {} preReceive", SimpleCustomReceiveInterceptor.class);
// check if required
return true;
}

@Override
public void postReceive( TransferContext context, Exception cause) throws Exception
{
logger.debug( "custom interceptoer {} postReceive", SimpleCustomReceiveInterceptor.class);
// process if required
}

@Override
public void afterCompletion( TransferContext context) throws Exception
{
logger.debug( "custom interceptoer {} afterCompletion", SimpleCustomReceiveInterceptor.class);
// process if required
}

}


Request Script Agent Serverの実行環境でpythonとして作成されているファイル処理の前後処理を再使用するか、またはscript適用が好まれる場合、これを使用できるようにAgent Serverでは要求の前後に実行されるScriptが指定された場合、これを実行する。

下記はテスト環境で前後処理に使用されたscriptの例である。 Spring bean形式で作成されたinterceptorを呼び出す時と同様に処理中のRequestのヘッダー項目と設定(application.yml)に指定された項目値、現在処理中のファイルのパス値にアクセスすることができる。


#!/usr/bin/python2.7

###
# Forwarded read-only attributes from agent
# AgentHeaders attributes
# Current-File
# Environment Properties
###

import sys

class SayHello:


def sayhello(self):
print '-------------------------------------------------------------------------'
print 'Number of arguments:', len(sys.argv), 'arguments.'
print 'Argument List:', str(sys.argv)
print 'Agent : '+ headers['Agent']
if( len(currentFiles) > 0):
print 'Current_File : '+ currentFiles[0]
print 'agent.repository.base-dir : '+ environment['agent.repository.base-dir']
print '-------------------------------------------------------------------------'
print '-------------------------------------------------------------------------'

def __init__(self, context, environment, headers):
if(context != None):
print 'invoke from agent - not command line'

def main():
SayHello(context, environment, headers).sayhello()

if __name__ == '__builtin__':
sys.argv = commandlineArgs
main()
elif __name__ == '__main__':
context = None
environment = None
headers = None
main()