package org.apache.hadoop.filecache;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.DateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.TaskDistributedCacheManager.CacheFile;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapred.TaskController;
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.mapreduce.security.TokenCache;
/**
* Manages a single machine's instance of a cross-job
* cache. This class would typically be instantiated
* by a TaskTracker (or something that emulates it,
* like LocalJobRunner).
*
* <b>This class is internal to Hadoop, and should not be treated as a public
* interface.</b>
*/
public class TrackerDistributedCacheManager {
// cacheID to cacheStatus mapping
private LinkedHashMap<String, CacheStatus> cachedArchives =
new LinkedHashMap<String, CacheStatus>();
private Map<JobID, TaskDistributedCacheManager> jobArchives =
Collections.synchronizedMap(
new HashMap<JobID, TaskDistributedCacheManager>());
private final TaskController taskController;
private static final FsPermission PUBLIC_CACHE_OBJECT_PERM =
FsPermission.createImmutable((short) 0755);
// default total cache size (10GB)
private static final long DEFAULT_CACHE_SIZE = 10737418240L;
private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
private static final float DEFAULT_CACHE_KEEP_AROUND_PCT = 0.95f;
private long allowedCacheSize;
private long allowedCacheSubdirs;
private long allowedCacheSizeCleanupGoal;
private long allowedCacheSubdirsCleanupGoal;
private static final Log LOG =
LogFactory.getLog(TrackerDistributedCacheManager.class);
private final LocalFileSystem localFs;
private LocalDirAllocator lDirAllocator;
private Configuration trackerConf;
private static final Random random = new Random();
protected BaseDirManager baseDirManager = new BaseDirManager();
protected CleanupThread cleanupThread;
public TrackerDistributedCacheManager(Configuration conf,
TaskController controller
) throws IOException {
this.localFs = FileSystem.getLocal(conf);
this.trackerConf = conf;
this.lDirAllocator = new LocalDirAllocator("mapred.local.dir");
// setting the cache size to a default of 10GB
this.allowedCacheSize = conf.getLong
("local.cache.size", DEFAULT_CACHE_SIZE);
// setting the cache number of subdirectories limit to a default of 10000
this.allowedCacheSubdirs = conf.getLong
("mapreduce.tasktracker.local.cache.numberdirectories",
DEFAULT_CACHE_SUBDIR_LIMIT);
double cleanupPct = conf.getFloat("mapreduce.tasktracker.cache.local.keep.pct",
DEFAULT_CACHE_KEEP_AROUND_PCT);
this.allowedCacheSizeCleanupGoal =
(long)(this.allowedCacheSize * cleanupPct);
this.allowedCacheSubdirsCleanupGoal =
(long)(this.allowedCacheSubdirs * cleanupPct);
this.taskController = controller;
this.cleanupThread = new CleanupThread(conf);
}
/**
* Get the locally cached file or archive; it could either be
* previously cached (and valid) or copy it from the {@link FileSystem} now.
*
* @param cache the cache to be localized, this should be specified as
* new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
* @param conf The Configuration file which contains the filesystem
* @param subDir The base cache subDir where you want to localize the
* files/archives
* @param fileStatus The file status on the dfs.
* @param isArchive if the cache is an archive or a file. In case it is an
* archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
* be unzipped/unjarred/untarred automatically
* and the directory where the archive is unzipped/unjarred/untarred is
* returned as the Path.
* In case of a file, the path to the file is returned
* @param confFileStamp this is the hdfs file modification timestamp to verify
* that the file to be cached hasn't changed since the job started
* @param isPublic to know the cache file is accessible to public or private
* @return the path to directory where the archives are unjarred in case of
* archives, the path to the file where the file is copied locally
* @throws IOException
*/
Path getLocalCache(URI cache, Configuration conf,
String subDir, FileStatus fileStatus,
boolean isArchive, long confFileStamp,
boolean isPublic, CacheFile file) throws IOException {
String key;
String user = getLocalizedCacheOwner(isPublic);
key = getKey(cache, conf, confFileStamp, user, isArchive);
CacheStatus lcacheStatus;
Path localizedPath = null;
Path localPath = null;
synchronized (cachedArchives) {
lcacheStatus = cachedArchives.get(key);
if (lcacheStatus == null) {
// was never localized
String uniqueString
= (String.valueOf(random.nextLong())
+ "_" + cache.hashCode()
+ "_" + (confFileStamp % Integer.MAX_VALUE));
String cachePath = new Path (subDir,
new Path(uniqueString, makeRelative(cache, conf))).toString();
localPath = lDirAllocator.getLocalPathForWrite(cachePath,
fileStatus.getLen(), trackerConf, isPublic);
lcacheStatus =
new CacheStatus(new Path(localPath.toString().replace(cachePath, "")),
localPath, new Path(subDir), uniqueString,
isPublic ? null : user, key);
cachedArchives.put(key, lcacheStatus);
}
//mark the cache for use.
file.setStatus(lcacheStatus);
lcacheStatus.incRefCount();
}
try {
// do the localization, after releasing the global lock
synchronized (lcacheStatus) {
if (!lcacheStatus.isInited()) {
if (isPublic) {
localizedPath = localizePublicCacheObject(conf,
cache,
confFileStamp,
lcacheStatus, fileStatus,
isArchive);
} else {
localizedPath = localPath;
if (!isArchive) {
//for private archives, the lengths come over RPC from the
//JobLocalizer since the JobLocalizer is the one who expands
//archives and gets the total length
lcacheStatus.size = fileStatus.getLen();
// Increase the s