Yanyg - Software Engineer

Hadoop RPC 代码分析

目录

典型的RPC要解决如下问题:

1 Client

Code: https://github.com/apache/hadoop/tree/trunk/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs

  • 两种Serialize方式, ProtobufRpcEngine和WritableRpcEngine;
  • class Call 获取callId, callIdCounter 维护CallId, 0x7FFFFFFF回绕;
  • getConnection 获取一个连接,之后在此连接上发送请求 connection.sendRpcRequest(call);
    • getRpcResponse 获取应答;
    • 用户通过接口 connection.sendRpcRequest(call) 调用,阻塞用户线程;
    • Connection通过ConcurrentHash维护 ConcurrentMap<ConnectionId, Connection>;
    • Connection内的多个Call通过Hash维护 Hashtable<Integer, Call>;
    • Connection派生自Thread, 是一个线程,后台run获取server应答;
      • run() -> while(waitForWork()) receiveRpcResponse();
    • getConection 时,通过 setupIOstreams 设置Input/Output Stream; 生成Input/Output Stream过程中,进行鉴权认证;

1.1 DFSClient: 实例化例子

DFSClient构造函数创建proxyInfo,以及构造namenode:

if (proxyInfo != null) {
    // ...
} else if (rpcNamenode != null) {
    // This case is used for testing.
} else {
    Preconditions.checkArgument(nameNodeUri != null,
                                "null URI");
    proxyInfo = NameNodeProxiesClient.createProxyWithClientProtocol(
        conf,
        nameNodeUri, nnFallbackToSimpleAuth);
    this.dtService = proxyInfo.getDelegationTokenService();
    this.namenode = proxyInfo.getProxy();
}

public static ProxyAndInfo<ClientProtocol> createProxyWithClientProtocol(
    Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth)
    throws IOException {
    AbstractNNFailoverProxyProvider<ClientProtocol> failoverProxyProvider =
        createFailoverProxyProvider(conf, nameNodeUri, ClientProtocol.class,
                                    true, fallbackToSimpleAuth);

    if (failoverProxyProvider == null) {
        InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri);
        Text dtService = SecurityUtil.buildTokenService(nnAddr);
        ClientProtocol proxy = createNonHAProxyWithClientProtocol(
            nnAddr, conf,
            UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
        return new ProxyAndInfo<>(proxy, dtService, nnAddr);
    } else {
        return createHAProxy(conf, nameNodeUri, ClientProtocol.class,
                             failoverProxyProvider);
    }
}

public static ClientProtocol createNonHAProxyWithClientProtocol(
    InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
    boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
    throws IOException {
    return createProxyWithAlignmentContext(address, conf, ugi, withRetries,
                                           fallbackToSimpleAuth, null);
}

public static ClientProtocol createProxyWithAlignmentContext(
    InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
    boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
    AlignmentContext alignmentContext)
    throws IOException {
    RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
                          ProtobufRpcEngine2.class);

    final RetryPolicy defaultPolicy =
        RetryUtils.getDefaultRetryPolicy(
            conf,
            HdfsClientConfigKeys.Retry.POLICY_ENABLED_KEY,
            HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
            HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
            HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
            SafeModeException.class.getName());

    final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
    ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
        ClientNamenodeProtocolPB.class, version, address, ugi, conf,
        NetUtils.getDefaultSocketFactory(conf),
        org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
        fallbackToSimpleAuth, alignmentContext).getProxy();

    if (withRetries) { // create the proxy with retries
        Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<>();
        ClientProtocol translatorProxy =
            new ClientNamenodeProtocolTranslatorPB(proxy);
        return (ClientProtocol) RetryProxy.create(
            ClientProtocol.class,
            new DefaultFailoverProxyProvider<>(ClientProtocol.class,
                                               translatorProxy),
            methodNameToPolicyMap,
            defaultPolicy);
    } else {
        return new ClientNamenodeProtocolTranslatorPB(proxy);
    }
}

NameNodeProxiesClient.createProxyWithClientProtocol

1.2 ClientNamenodeProtocolTranslatorPB

/**
 * This class forwards NN's ClientProtocol calls as RPC calls to the NN server
 * while translating from the parameter types used in ClientProtocol to the
 * new PB types.
 */

create 为例:

public final FSDataOutputStream create(final Path f,
    final EnumSet<CreateFlag> createFlag, Options.CreateOpts... opts)
    throws AccessControlException, FileAlreadyExistsException,
    FileNotFoundException, ParentNotDirectoryException,
    UnsupportedFileSystemException, UnresolvedLinkException, IOException {
  checkPath(f);
  int bufferSize = -1;
  short replication = -1;
  long blockSize = -1;
  int bytesPerChecksum = -1;
  ChecksumOpt checksumOpt = null;
  FsPermission permission = null;
  Progressable progress = null;
  Boolean createParent = null;

  for (CreateOpts iOpt : opts) {
    if (CreateOpts.BlockSize.class.isInstance(iOpt)) {
      if (blockSize != -1) {
        throw new HadoopIllegalArgumentException(
            "BlockSize option is set multiple times");
      }
      blockSize = ((CreateOpts.BlockSize) iOpt).getValue();
    } else if (CreateOpts.BufferSize.class.isInstance(iOpt)) {
      if (bufferSize != -1) {
        throw new HadoopIllegalArgumentException(
            "BufferSize option is set multiple times");
      }
      bufferSize = ((CreateOpts.BufferSize) iOpt).getValue();
    } else if (...) {
      // ...
    } else {
      throw new HadoopIllegalArgumentException("Unkown CreateOpts of type " +
          iOpt.getClass().getName());
    }
  }

  // ...

  return this.createInternal(f, createFlag, permission, bufferSize,
    replication, blockSize, progress, checksumOpt, createParent);
}

@Override
public HdfsDataOutputStream createInternal(Path f,
    EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
    int bufferSize, short replication, long blockSize, Progressable progress,
    ChecksumOpt checksumOpt, boolean createParent) throws IOException {

  final DFSOutputStream dfsos = dfs.primitiveCreate(getUriPath(f),
    absolutePermission, createFlag, createParent, replication, blockSize,
    progress, bufferSize, checksumOpt);
  return dfs.createWrappedOutputStream(dfsos, statistics,
      dfsos.getInitialLen());
}

public DFSOutputStream primitiveCreate(String src, FsPermission absPermission,
    EnumSet<CreateFlag> flag, boolean createParent, short replication,
    long blockSize, Progressable progress, int buffersize,
    ChecksumOpt checksumOpt) throws IOException {
  checkOpen();
  CreateFlag.validate(flag);
  DFSOutputStream result = primitiveAppend(src, flag, progress);
  if (result == null) {
    DataChecksum checksum = dfsClientConf.createChecksum(checksumOpt);
    result = DFSOutputStream.newStreamForCreate(this, src, absPermission,
        flag, createParent, replication, blockSize, progress, checksum,
        null, null, null);
  }
  beginFileLease(result.getFileId(), result);
  return result;
}

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
    FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
    short replication, long blockSize, Progressable progress,
    DataChecksum checksum, String[] favoredNodes, String ecPolicyName,
    String storagePolicy)
    throws IOException {
  try (TraceScope ignored =
           dfsClient.newPathTraceScope("newStreamForCreate", src)) {
    HdfsFileStatus stat = null;

    // Retry the create if we get a RetryStartFileException up to a maximum
    // number of times
    boolean shouldRetry = true;
    int retryCount = CREATE_RETRY_COUNT;
    while (shouldRetry) {
      shouldRetry = false;
      try {
        stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
            new EnumSetWritable<>(flag), createParent, replication,
            blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName,
            storagePolicy);
        break;
      } catch (RemoteException re) {
        IOException e = re.unwrapRemoteException(
            AccessControlException.class,
            DSQuotaExceededException.class,
            QuotaByStorageTypeExceededException.class,
            FileAlreadyExistsException.class,
            FileNotFoundException.class,
            ParentNotDirectoryException.class,
            NSQuotaExceededException.class,
            RetryStartFileException.class,
            SafeModeException.class,
            UnresolvedPathException.class,
            SnapshotAccessControlException.class,
            UnknownCryptoProtocolVersionException.class);
        if (e instanceof RetryStartFileException) {
          if (retryCount > 0) {
            shouldRetry = true;
            retryCount--;
          } else {
            throw new IOException("Too many retries because of encryption" +
                " zone operations", e);
          }
        } else {
          throw e;
        }
      }
    }
    Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
    final DFSOutputStream out;
    if(stat.getErasureCodingPolicy() != null) {
      out = new DFSStripedOutputStream(dfsClient, src, stat,
          flag, progress, checksum, favoredNodes);
    } else {
      out = new DFSOutputStream(dfsClient, src, stat,
          flag, progress, checksum, favoredNodes, true);
    }
    out.start();
    return out;
  }
}

// ClientNamenodeProtocolTranslatorPB.java
@Override
public HdfsFileStatus create(String src, FsPermission masked,
    String clientName, EnumSetWritable<CreateFlag> flag,
    boolean createParent, short replication, long blockSize,
    CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
    String storagePolicy)
    throws IOException {
    CreateRequestProto.Builder builder = CreateRequestProto.newBuilder()
        .setSrc(src)
        .setMasked(PBHelperClient.convert(masked))
        .setClientName(clientName)
        .setCreateFlag(PBHelperClient.convertCreateFlag(flag))
        .setCreateParent(createParent)
        .setReplication(replication)
        .setBlockSize(blockSize);
    if (ecPolicyName != null) {
        builder.setEcPolicyName(ecPolicyName);
    }
    if (storagePolicy != null) {
        builder.setStoragePolicy(storagePolicy);
    }
    FsPermission unmasked = masked.getUnmasked();
    if (unmasked != null) {
        builder.setUnmasked(PBHelperClient.convert(unmasked));
    }
    builder.addAllCryptoProtocolVersion(
        PBHelperClient.convert(supportedVersions));
    CreateRequestProto req = builder.build();
    try {
        // NameNodeProxiesClient.java init rpcProxy from RPC.getProtocolProxy.
        CreateResponseProto res = rpcProxy.create(null, req);
        return res.hasFs() ? PBHelperClient.convert(res.getFs()) : null;
    } catch (ServiceException e) {
        throw ProtobufHelper.getRemoteException(e);
    }
}

// bind to invoke
@Override
public Message invoke(Object proxy, final Method method, Object[] args)
    throws ServiceException {
    long startTime = 0;
    if (LOG.isDebugEnabled()) {
      startTime = Time.now();
    }

    if (args.length != 2) { // RpcController + Message
      throw new ServiceException(
          "Too many or few parameters for request. Method: ["
          + method.getName() + "]" + ", Expected: 2, Actual: "
          + args.length);
    }
    if (args[1] == null) {
      throw new ServiceException("null param while calling Method: ["
          + method.getName() + "]");
    }

    // if Tracing is on then start a new span for this rpc.
    // guard it in the if statement to make sure there isn't
    // any extra string manipulation.
    Tracer tracer = Tracer.curThreadTracer();
    TraceScope traceScope = null;
    if (tracer != null) {
      traceScope = tracer.newScope(RpcClientUtil.methodToTraceString(method));
    }

    if (LOG.isTraceEnabled()) {
      LOG.trace(Thread.currentThread().getId() + ": Call -> " +
          remoteId + ": " + method.getName() +
          " {" + TextFormat.shortDebugString((Message) args[1]) + "}");
    }


    final Message theRequest = (Message) args[1];
    final RpcWritable.Buffer val;
    try {
      val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
          constructRpcRequest(method, theRequest), remoteId,
          fallbackToSimpleAuth, alignmentContext);

    } catch (Throwable e) {
      if (LOG.isTraceEnabled()) {
        LOG.trace(Thread.currentThread().getId() + ": Exception <- " +
            remoteId + ": " + method.getName() +
              " {" + e + "}");
      }
      if (traceScope != null) {
        traceScope.addTimelineAnnotation("Call got exception: " +
            e.toString());
      }
      throw new ServiceException(e);
    } finally {
      if (traceScope != null) {
        traceScope.close();
      }
    }

    if (LOG.isDebugEnabled()) {
      long callTime = Time.now() - startTime;
      LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
    }

    if (Client.isAsynchronousMode()) {
      final AsyncGet<RpcWritable.Buffer, IOException> arr
          = Client.getAsyncRpcResponse();
      final AsyncGet<Message, Exception> asyncGet =
          new AsyncGet<Message, Exception>() {
            @Override
            public Message get(long timeout, TimeUnit unit) throws Exception {
              return getReturnMessage(method, arr.get(timeout, unit));
            }

            @Override
            public boolean isDone() {
          return arr.isDone();
        }
      };
      ASYNC_RETURN_MESSAGE.set(asyncGet);
      return null;
    } else {
      return getReturnMessage(method, val);
    }
  }

rpc请求递交给rpcProxy:

1.3 ProtobufRpcEngine2

@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(
    Class<T> protocol, long clientVersion,
    ConnectionId connId, Configuration conf, SocketFactory factory)
    throws IOException {
    final Invoker invoker = new Invoker(protocol, connId, conf, factory);
    return new ProtocolProxy<T>(
        protocol, (T) Proxy.newProxyInstance(
                          protocol.getClassLoader(),
                          new Class[] {protocol}, invoker), false);
}

2 Server

Server通过getServer创建,Reactor模式.

  • RpcRequestWrapper, RpcResponseWrapper;
  • RpcInvoker, 通过反射调用Service方法;
  • 基于Nio实现Reactor驱动处于Accept,Read,Write;
public static class Server extends RPC.Server {
}
@Override
public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
    String bindAddress, int port, int numHandlers, int numReaders,
    int queueSizePerHandler, boolean verbose, Configuration conf,
    SecretManager<? extends TokenIdentifier> secretManager,
    String portRangeConfig, AlignmentContext alignmentContext)
    throws IOException {
  return new Server(protocol, protocolImpl, conf, bindAddress, port,
      numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
      portRangeConfig, alignmentContext);
}

// hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
public abstract class Server {
/**
 * Constructs a server listening on the named port and address.  Parameters passed must
 * be of the named class.  The <code>handlerCount</code> determines
 * the number of handler threads that will be used to process calls.
 * If queueSizePerHandler or numReaders are not -1 they will be used instead of parameters
 * from configuration. Otherwise the configuration will be picked up.
 *
 * If rpcRequestClass is null then the rpcRequestClass must have been
 * registered via {@link #registerProtocolEngine(RPC.RpcKind,
 *  Class, RPC.RpcInvoker)}
 * This parameter has been retained for compatibility with existing tests
 * and usage.
 *
 * @param bindAddress input bindAddress.
 * @param port input port.
 * @param rpcRequestClass input rpcRequestClass.
 * @param handlerCount input handlerCount.
 * @param numReaders input numReaders.
 * @param queueSizePerHandler input queueSizePerHandler.
 * @param conf input Configuration.
 * @param serverName input serverName.
 * @param secretManager input secretManager.
 * @param portRangeConfig input portRangeConfig.
 * @throws IOException raised on errors performing I/O.
 */
@SuppressWarnings("unchecked")
protected Server(String bindAddress, int port,
    Class<? extends Writable> rpcRequestClass, int handlerCount,
    int numReaders, int queueSizePerHandler, Configuration conf,
    String serverName, SecretManager<? extends TokenIdentifier> secretManager,
    String portRangeConfig)
  throws IOException {
  this.bindAddress = bindAddress;
  this.conf = conf;
  this.portRangeConfig = portRangeConfig;
  this.port = port;
  this.rpcRequestClass = rpcRequestClass;
  this.handlerCount = handlerCount;
  this.socketSendBufferSize = 0;
  this.serverName = serverName;
  this.auxiliaryListenerMap = null;
  this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
      CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
  if (queueSizePerHandler != -1) {
    this.maxQueueSize = handlerCount * queueSizePerHandler;
  } else {
    this.maxQueueSize = handlerCount * conf.getInt(
        CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
        CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
  }
  this.maxRespSize = conf.getInt(
      CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
      CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
  if (numReaders != -1) {
    this.readThreads = numReaders;
  } else {
    this.readThreads = conf.getInt(
        CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
        CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
  }
  this.readerPendingConnectionQueue = conf.getInt(
      CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
      CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);

  // Setup appropriate callqueue
  final String prefix = getQueueClassPrefix();
  this.callQueue = new CallQueueManager<>(
      getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
      getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
      getClientBackoffEnable(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
      maxQueueSize, prefix, conf);

  this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
  this.authorize =
    conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
                    false);

  // configure supported authentications
  this.enabledAuthMethods = getAuthMethods(secretManager, conf);
  this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods);

  // Start the listener here and let it bind to the port
  listener = new Listener(port);
  // set the server port to the default listener port.
  this.port = listener.getAddress().getPort();
  connectionManager = new ConnectionManager();
  this.rpcMetrics = RpcMetrics.create(this, conf);
  this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
  this.tcpNoDelay = conf.getBoolean(
      CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
      CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_DEFAULT);

  this.setLogSlowRPC(conf.getBoolean(
      CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC,
      CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT));

  this.setPurgeIntervalNanos(conf.getInt(
      CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY,
      CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT));

  // Create the responder here
  responder = new Responder();

  if (secretManager != null || UserGroupInformation.isSecurityEnabled()) {
    SaslRpcServer.init(conf);
    saslPropsResolver = SaslPropertiesResolver.getInstance(conf);
  }

  this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
  this.exceptionsHandler.addTerseLoggingExceptions(
      HealthCheckFailedException.class);
  this.metricsUpdaterInterval =
      conf.getLong(CommonConfigurationKeysPublic.IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL,
          CommonConfigurationKeysPublic.IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT);
  this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Hadoop-Metrics-Updater-%d")
          .build());
  this.scheduledExecutorService.scheduleWithFixedDelay(new MetricsUpdateRunner(),
      metricsUpdaterInterval, metricsUpdaterInterval, TimeUnit.MILLISECONDS);
}

public synchronized void addAuxiliaryListener(int auxiliaryPort)
    throws IOException {
  if (auxiliaryListenerMap == null) {
    auxiliaryListenerMap = new HashMap<>();
  }
  if (auxiliaryListenerMap.containsKey(auxiliaryPort) && auxiliaryPort != 0) {
    throw new IOException(
        "There is already a listener binding to: " + auxiliaryPort);
  }
  Listener newListener = new Listener(auxiliaryPort);
  newListener.setIsAuxiliary();

  // in the case of port = 0, the listener would be on a != 0 port.
  LOG.info("Adding a server listener on port " +
      newListener.getAddress().getPort());
  auxiliaryListenerMap.put(newListener.getAddress().getPort(), newListener);
}
}

2.1 Code Trace

  // NameNodeRpcServer.java
  public NameNodeRpcServer(Configuration conf, NameNode nn)
      throws IOException {
    ClientNamenodeProtocolServerSideTranslatorPB
       clientProtocolServerTranslator =
         new ClientNamenodeProtocolServerSideTranslatorPB(this);
     BlockingService clientNNPbService = ClientNamenodeProtocol.
         newReflectiveBlockingService(clientProtocolServerTranslator);

    InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
    if (serviceRpcAddr != null) {
      int serviceHandlerCount =
        conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                    DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
      serviceRpcServer = new RPC.Builder(conf)
          .setProtocol(
              org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
          .setInstance(clientNNPbService)
          .setBindAddress(bindHost)
          .setPort(serviceRpcAddr.getPort())
          .setNumHandlers(serviceHandlerCount)
          .setVerbose(false)
          .setSecretManager(namesystem.getDelegationTokenSecretManager())
          .build();

      // Update the address with the correct port
      InetSocketAddress listenAddr = serviceRpcServer.getListenerAddress();
      serviceRPCAddress = new InetSocketAddress(
            serviceRpcAddr.getHostName(), listenAddr.getPort());
      nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
    }
  }

  // ClientNamenodeProtocolServerSideTranslatorPB.java
  @Override
  public CreateResponseProto create(RpcController controller,
      CreateRequestProto req) throws ServiceException {
    try {
      FsPermission masked = req.hasUnmasked() ?
          FsCreateModes.create(PBHelperClient.convert(req.getMasked()),
              PBHelperClient.convert(req.getUnmasked())) :
          PBHelperClient.convert(req.getMasked());
      HdfsFileStatus result = server.create(req.getSrc(),
          masked, req.getClientName(),
          PBHelperClient.convertCreateFlag(req.getCreateFlag()), req.getCreateParent(),
          (short) req.getReplication(), req.getBlockSize(),
          PBHelperClient.convertCryptoProtocolVersions(
              req.getCryptoProtocolVersionList()),
          req.getEcPolicyName(), req.getStoragePolicy());

      if (result != null) {
        return CreateResponseProto.newBuilder().setFs(PBHelperClient.convert(result))
            .build();
      }
      return VOID_CREATE_RESPONSE;
    } catch (IOException e) {
      throw new ServiceException(e);
    }
  }

// hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
// Optional.
@Override // ClientProtocol
  public HdfsFileStatus create(String src, FsPermission masked,
      String clientName, EnumSetWritable<CreateFlag> flag,
      boolean createParent, short replication, long blockSize,
      CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
      String storagePolicy)
      throws IOException {
    return clientProto.create(src, masked, clientName, flag, createParent,
        replication, blockSize, supportedVersions, ecPolicyName, storagePolicy);
  }

  // hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  @Override // ClientProtocol
  public HdfsFileStatus create(String src, FsPermission masked,
      String clientName, EnumSetWritable<CreateFlag> flag,
      boolean createParent, short replication, long blockSize,
      CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
      String storagePolicy)
      throws IOException {
    checkNNStartup();
    String clientMachine = getClientMachine();
    if (stateChangeLog.isDebugEnabled()) {
      stateChangeLog.debug("*DIR* NameNode.create: file "
          +src+" for "+clientName+" at "+clientMachine);
    }
    if (!checkPathLength(src)) {
      throw new IOException("create: Pathname too long.  Limit "
          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
    }
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
    if (cacheEntry != null && cacheEntry.isSuccess()) {
      return (HdfsFileStatus) cacheEntry.getPayload();
    }

    HdfsFileStatus status = null;
    try {
      PermissionStatus perm = new PermissionStatus(getRemoteUser()
          .getShortUserName(), null, masked);
      status = namesystem.startFile(src, perm, clientName, clientMachine,
          flag.get(), createParent, replication, blockSize, supportedVersions,
          ecPolicyName, storagePolicy, cacheEntry != null);
    } finally {
      RetryCache.setState(cacheEntry, status != null, status);
    }

    metrics.incrFilesCreated();
    metrics.incrCreateFileOps();
    return status;
  }

// hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
/**
   * Create a new file entry in the namespace.
   *
   * For description of parameters and exceptions thrown see
   * {@link ClientProtocol#create}, except it returns valid file status upon
   * success
   */
  HdfsFileStatus startFile(String src, PermissionStatus permissions,
      String holder, String clientMachine, EnumSet<CreateFlag> flag,
      boolean createParent, short replication, long blockSize,
      CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
      String storagePolicy, boolean logRetryCache) throws IOException {

    HdfsFileStatus status;
    try {
      status = startFileInt(src, permissions, holder, clientMachine, flag,
          createParent, replication, blockSize, supportedVersions, ecPolicyName,
          storagePolicy, logRetryCache);
    } catch (AccessControlException e) {
      logAuditEvent(false, "create", src);
      throw e;
    }
    logAuditEvent(true, "create", src, status);
    return status;
  }

3 References