精通IPFS:IPFS 获取内容之上篇
QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ
,下载它的示例代码如下:
const {createNode} = require('ipfs')
const fs = require('fs');
const node = createNode()
node.on('ready', async () => { const file = await node.get('QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ/cat.jpg')
fs.writeFile('cat.jpg',file[0].content,(err) => {
if (err) throw err; console.log('cat saved!'); }); }) 运行这段代码,就会在当前目录下生成喵星人,让我们先来欣赏下美丽的喵星人吧!
get
方法,这个方法位于 IPFS 的
core/components/files-regular/get.js
文件,它的内容如下:
(ipfsPath, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
} options = options || {}
pull(
self.getPullStream(ipfsPath, options),
pull.asyncMap((file, cb) => {
if (file.content) {
pull(
file.content,
pull.collect((err, buffers) => {
if (err) { return cb(err) }
file.content = Buffer.concat(buffers)
cb(null, file)
})
)
} else {
cb(null, file)
}
}),
pull.collect(callback)
)
}
这个匿名函数接收我们传递给它的图片路径,通过 pull-stream 类库的
pull
函数,调用 IPFS 对象的
getPullStream
方法 获取文件,并在获取到文件之后,调用
pull.asyncMap
流对获取到的文件进行处理,最后把最终的文件传递给
pull.collect
流进行处理,后者直接调用我们提供的回调函数把最终的文件交给用户来处理。
通过上面的简单分析,我们可以发现从 IPFS 网络中获取文件主要是通过 IPFS 对象的
getPullStream
方法,而这个方法是在创建 IPFS 对象的过程中在
core/index.js
文件中被注册为 IPFS 对象的一个方法,它的主要内容就是返回一个 pull-stream 类库的流,代码如下:
pull(
exporter(ipfsPath, self._ipld, options),
pull.map(file => {
file.hash = file.cid.toString()
delete file.cid
return file
})
)
上面的代码定义在
core/components/files-regular/get-pull-stream.js
文件,主体也是 pull-stream 类库的
pull
函数,第一个参数是调用
ipfs-unixfs-exporter
类库函数返回的内部流,这个内部流会从 IPFS 网络中获取我们想要的文件,第二个参数是 pull-stream 类库的
map
流,它根据获取到的文件对象的 CID 生成文件的哈希。
接下来,我们开始看下
exporter
函数,它的代码位于 ipfs-unixfs-exporter 类库的
index.js
文件中。它的执行逻辑如下:
-
对传递进来的路径进行处理,返回一个对象。返回的对象中包含了基本路径和中间路径。
这里我们传递进来的路径是let dPath try { dPath = pathBaseAndRest(path) } catch (err) { return error(err) }
QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ/cat.jpg
,所以dPath
对象的基本路径就是QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ
,中间路径的数组为空。 -
获取不包括最终名称在内的路径长度
const pathLengthToCut = join([dPath.base].concat(dPath.rest.slice(0, dPath.rest.length - 1))).length
-
根据基本路径生成 CID 对象。
CID 对象包括四个部分:multibase、版本号、multicodec、multihash。当我们传递const cid = new CID(dPath.base)
QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ
到 CID 构造函数中时,函数内部会设置版本号为 0,multicodec 为dag-pb
,同时调用多重哈希函数把 Base58 字符串转化为 multihash。 -
最后,使用
pull
函数返回 pull-stream 类库中使用的流。
返回的这个流被外面return pull( values([{ cid, name: dPath.base, path: dPath.base, pathRest: dPath.rest, depth: 0 }]), createResolver(dag, options), filter(Boolean), map((node) => { return { depth: node.depth, name: node.name, path: options.fullPath ? node.path : finalPathFor(node), size: node.size, cid: node.cid, content: node.content, type: node.type } }) )
get-pull-stream.js
文件中的pull
函数所调用,从而从 IPFS 网络中获取指定的文件。在上面的代码简单解释如下:
-
values
函数是 pull-stream 类库中定义的source
流,它会创建一个从数组或对象读取值,然后终止的源流,这里我们用传递进来的路径及其生成 CID 生成一个对象,并生成一个数组来做为流的内容。 -
createResolver
函数是当前目录下resolve.js
文件中的createResolver
函数,它接收 dag 对象,并返回一个 pull-stream 流对象,这里 dag 对象是 IPFS 对象中的_ipld
对象。这里返回的流对象,从前面一个流中读取要获取的文件,然后调用 dag 对象的get
方法来获取指定的文件,具体处理下面进行分析。 -
map
函数是 pull-stream 类库中定义的through
流,它使用用户指定的转换函数对数组中的每个元素进行转换。这里的处理函数比较简单,根据前面流返回的对象,生成并返回另一个对象。这里返回的对象,就是我们最终在示例程序中看到的对象,除了它没有hash
属性,并且cid
被删除。
-
createResolver
这个函数,它定义于 ipfs-unixfs-exporter 类库的
resolve.js
文件中,它的主体是生成并返回一个 pull-stream 类库中使用
pull
函数,代码具体如下:
pull(
paramap((item, cb) => {
if ((typeof item.depth) !== 'number') {
return error(new Error('no depth'))
} if (item.object) {
return cb(null, resolveItem(null, item.object, item, options))
}
waterfall([
function (done) {
dag.get(item.cid, done)
},
function (node, done) {
// node 为区块对象反序列化后的结果,可能为文件总的 Dag,也可能为某个文件(在不分块的情况下)。
done(null, resolveItem(item.cid, node.value, item, options))
}
], cb)
}),
flatten(),
filter(Boolean),
filter((node) => node.depth <= options.maxDepth)
)
上面代码简单解释如下:
-
首先,调用
paramap
函数,返回 pull-paramap 流,在这个流中使用异步类库的waterfall
方法,依次调用 IPLD 对象的get
方法,从本地或其他节点获取区块对象;在得到区块对象之后,调用resolveItem
方法,处理得到的区块对象(这里得到的区块对象,可能为一个完整的文件,也可能是文件的一个碎片,还可能是一个目录等)。pull-paramap 流是一个 pull-stream 流,它接收 3个参数,第一个参数类型为函数,函数签名为
这里(data, cb)
,在函数中执行用户自定义的业务逻辑,第二个和第三参数都是可选的。pull-paramap 并行地从前面的流中读取数据,调用第一个参数指定函数进行处理,把函数调用结果以数组的形式返回给后面的流。数组中结果的顺序与前面的流提供的源数据顺序保持一致。
paramap
函数的异步处理函数内容如下:-
检查当前对象的
depth
属性是否不是数字,如果不是数字,则返回错误。这里当前对象即为前面values
流中生成并返回的对象。 -
如果当前对象的
object
属性存在,则调用resolveItem
解释当前对象,并把结果传递给下一个函数。我们的对象没有object
属性,所以这里的代码不会执行。 -
调用异步类库的
waterfall
方法,依次调用 IPLD 对象的get
方法,从本地或其他节点获取区块对象;在得到区块对象之后,调用resolveItem
方法,处理得到的区块对象。
-
检查当前对象的
-
然后,调用 pull-stream 类库的
flatten
和filter
两个 through 流进返回的区块对象进行处理。 -
最后,调用 pull-stream 类库的
filter
流,过滤超过指定深度区块对象。
1、获取区块对象
上面的代码,我们是通过调用 dag 对象的
get
方法来获取区块对象,它的执行逻辑如下:
dag 对象的类型是 ipld 类库中的
IPLDResolver
对象,在 IPFS 对象初始化时生成并设置在 IPFS 对象上面。
-
如果路径参数类型为函数,则重新设置参数。根据上面调用,我们这里的路径参数是
waterfall
提供的内部函数done
,所以会执行下面的代码重新设置下面两个变量。if (typeof path === 'function') { callback = path path = undefined }
-
如果选项参数为函数,则重新设置参数。根据上面的调用,这里没有选项参数,所以不会执行下面的代码。
if (typeof options === 'function') { callback = options options = {} }
-
处理路径参数
if (typeof path === 'string') { path = joinPath('/', path) .substr(1) .split(osPathSep) .join('/') }
-
如果路径参数为空串或没有定义,则调用内部函数
_get
进行处理,并在它的异步回调函数中返回其结果。内部函数_get
内部通过waterfall
函数来处理,具体代码如下:
在waterfall([ (cb) => this._getFormat(cid.codec, cb), (format, cb) => this.bs.get(cid, (err, block) => { if (err) return cb(err) cb(null, format, block) }), (format, block, cb) => { format.util.deserialize(block.data, (err, deserialized) => { if (err) { return cb(err) } cb(null, deserialized) }) } ], callback)
waterfall
函数内部,首先调用_getFormat
方法,根据 CID 对象来获取其所对用的格式化对象;然后调用区块服务对象的get
方法来获得区块对象;最后,使用格式化对象的工具对象的反序列化方法,反序列化区块服务对象获得的区块数据。区块服务对象位于 ipfs-block-service 类库的
index.js
文件中,它的get
方法,根据是否有 bitswap 对象决定是从 bitswap 对象获取区块对象,还是从本地仓库中获取。它的代码如下:
当系统启动过程中,在处理get (cid, callback) { if (this.hasExchange()) { this._bitswap.get(cid, callback) } else { this._repo.blocks.get(cid, callback) } }
init-docs
目录内的帮助文档时,bitswap 对象才有空,即只有这个过程才会直接从本地仓库保存/获取区块对象,其他情况都是调用 bitswap 对象的get
方法来获取区块对象。bitswap 对象的
get
方法委托自身的getMany
方法进行处理。后者的处理过程如下:-
初始化内部所用的变量:
wantList
数组为空,promptedNetwork
为假,pendingStart
为请求的所有 CID 数量。 -
生成一个从其他节点获取区块对象的函数对象
getFromOutside
。 -
调用异步类库的
map
函数,遍历每一个要请求的区块对象。针对每一个要请求的区块,使用异步类库的waterfall
函数进行处理。waterfall
函数处理如下:-
调用区块存储对象的
has
方法,检查本地是否有请求的区块; -
如果本地有请求的区块,则:如果已经处理完所有请求的 CID,那么调用 WantManager 的
wantBlocks
方法获取需要的区块;调用区块存储对象的get
方法从本地加载区块并返回。 -
如果内部变量
promptedNetwork
为假,则:设置这个变量为真(保证只有一个请求可以马上处理);调用网络对象的findAndConnect
方法,查找第一个请求的 CID。 -
调用函数对象
getFromOutside
进行处理。这个函数把指定的 CID 放入wantList
数组中,然后调用notifications
对象的wantBlock
方法通知系统,我们想要这个区块;如果已经处理完所有请求的 CID,那么调用 WantManager 的wantBlocks
方法获取需要的区块。
-
调用区块存储对象的
notifications
对象是一个内部模块,用来跟踪收到的区块、想要的区块、不想要的区块等。这个函数的第一个参数就是请求的区块 CID,第二个参数是一个函数,用来在收到一个区块后,从想要列表中取消这个区块,避免再次请求别的节点,第三个参数用来取消请求某个区块。 -
初始化内部所用的变量:
-
当路径参数不为空串,并且有值时,调用异步类库的
doUntil
函数进行处理。doUntil
函数内部的业务处理与上面基本类似,读者可以自己分析。
2、解析区块对象
当调用 dag 对象的
get
方法获取到区块对象之后,是不是处理流程已经结束了?不是这样的,我们可以想像下,Unix 文件系统是一个树状的结构,从根目录
/
开始,然后是子目录,子目录下面又可以是孙目录或文件,孙目录下面又可以重孙目录或文件,子子孙孙无穷尽也。除了目录之外,当我们要获取的文件如果很大,在上传时候也会被切分类似目录树结构的结构,最顶层为文件总的 DAGNode 对象,它通过 DAGLink 连接到子碎片,子碎片又可以通过 DAGLink 连接到孙碎片,孙碎片又可以通过 DAGLink 连接到重孙碎片,子子孙孙无穷尽也。所以获取到区块对象之后,异步类库的
waterfall
函数通过调用它的第二个参数,从而调用
resolveItem
函数来解析获取到的区块对象,根据获取到的区块来获取完整的区块对象。
resolveItem
函数实现为直接委托给另一个函数
resolve
进行处理,代码如下:
function resolveItem (cid, node, item, options) {
return resolve({
cid,
node,
name: item.name,
path: item.path,
pathRest: item.pathRest,
dag,
parentNode: item.parent || parent,
depth: item.depth,
options
})
}
resolve
函数处理过程如下:
-
调用函数
typeOf
检测区块对象的真实类型。
区块对象可能为 directory、hamt-sharded-directory、file、object、raw,每种类型都有特定的处理器进行解析。try { type = typeOf(node) } catch (err) { return error(err) }
-
从
resolvers
对象中获取对应类型的解析器const nodeResolver = resolvers[type]
resolvers
对象定义在文件开头,内容如下:const resolvers = { directory: require('./dir-flat'), 'hamt-sharded-directory': require('./dir-hamt-sharded'), file: require('./file'), object: require('./object'), raw: require('./raw') }
-
调用
createResolver
函数,创建resolveDeep
。 -
调用
nodeResolver
函数,解析指定的区块对象。对于不同的类型,nodeResolver
函数是不同的。当获取的区块对象类型为目录时,函数为dir-flat.js
中定义的dirExporter
函数;当获取的区块对象类型为文件时,函数为file.js
文件中定义的函数;当获取的区块对象类型为对象时,函数object.js
文件中定义的函数。当我们获取喵星人时,涉及到两种类型,即目录和文件,下面我们就以这两种类型进行分析。dirExporter
函数执行过程如下:-
设置要获取的第一个对象
const accepts = pathRest[0]
pathRest
是我们在调用get
方法时,根据提供的路径生成的路径数组,数组中不包括路径的基础部分。对于我们的例子,数组只有一个元素,即cat.jpg
。 -
生成一个代表当前目录的变量。
const dir = { name: name, depth: depth, path: path, cid, size: 0, type: 'dir' }
-
如果当前获取对象的深度超过了选项指定的最大深度,则返回 pull-stream 类库的源流
values
。if (options.maxDepth && options.maxDepth <= depth) { return values([dir]) }
-
生成一个流数组。
上面生成的流数组,在函数尾部会通过 pull-cat 类库进行级连调用,上面代码最终执行过程描述如下:遍历当前目录的所有连接;除非掉不属于当前请求的连接;把所有符合条件的连接生成对应的对象;最后,把生成的对象传递给const streams = [ pull( values(node.links), filter((item) => accepts === undefined || item.name === accepts), map((link) => ({ depth: depth + 1, size: 0, name: link.name, path: path + '/' + link.name, cid: link.cid, linkName: link.name, pathRest: pathRest.slice(1), type: 'dir' })), resolve ) ]
resolve
流,即调用createResolver
函数返回的流,这个流我们在前面已经分析过,这里不再细讲。在我们获取喵星人的例子当中,我们指定的路径为
QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ/cat.jpg
,/
前面的路径为基本路径,它代表了一个目录,这个目录中有一个文件,这个文件即是喵星人,也即node.links
指向的是喵星人的 CID。通过 pull-cat 类库的调用,在createResolver
函数返回的流中我们会真正请求喵星人,具体文件的处理,我们在下面进行分析。 -
如果路径中除了基本路径之外没有别的路径,或者选项指定为完全路径,则使用
dir
变量生成一个values
流,并放在流数组streams
的最前面。 - 调用 pull-cat 类库进行流处理,上体处理过程见前面描述。
下面我们来看 IPFS 是如何具体文件的,正如我们在前面所提到的,每个文件在保存到 IPFS 网络中都可能进行分片,即把大的文件分成小的碎片,每个碎片有自己的哈希,根据碎片的哈希生成对应的 DAGLink,以碎片在文件中出现的顺序,使用这些 DAGLink 生成连接数组,使用连接数组生成最终的顶层 DAGNode 对象,以此来表示文件。我们的喵星人同样也被分成了两个碎片,在前面分析中,请求目录之后,通过 pull-cat 类库的调用,再次请求
createResolver
函数返回的流的过程中,我们会请求喵星人总的 DAGNode 对象,当调用nodeResolver
函数时,这次会选择file.js
文件进行请求处理,它的执行过程如下:-
设置要获取的第一个对象
这次const accepts = pathRest[0]
if (accepts !== undefined && accepts !== path) { return empty() }
pathRest
数组为空,所以这里accepts
是未定义。 -
调用
UnixFS
的静态方法unmarshal
方法,从区块对象的data
属性中解组出 Uninx 文件对象。try { file = UnixFS.unmarshal(node.data) } catch (err) { return error(err) }
-
获取文件大小、指定的长度和偏移量
const fileSize = file.fileSize()
let offset = options.offset let length = options.length
if (offset < 0) { return error(new Error('Offset must be greater than or equal to 0')) }
if (offset > fileSize) { return error(new Error('Offset must be less than the file size')) }
if (length < 0) { return error(new Error('Length must be greater than or equal to 0')) }
-
如果长度为 0,则生成并返回 pull-stream 类库的
once
流。if (length === 0) { return once({ depth: depth, content: once(Buffer.alloc(0)), name: name, path: path, cid, size: fileSize, type: 'file' }) }
-
重新计算偏移量和文件长度。
if (!offset) { offset = 0 }
if (!length || (offset + length > fileSize)) { length = fileSize - offset }
-
调用
streamBytes
函数,根据偏移量、长度及节点的连接数组,获取指定的内容。streamBytes
函数采用深度优先算法获取区块对象的所有碎片数据,它的结果是一个 pull-stream 类库的 through 流。代码如下:
pull-travers 类库中提供了深度优先、广度优先、叶子优先3种算法来遍历一颗树,这里我们使用了深度优先来遍历文件的所有碎片。if (offset === fileSize || length === 0) { return once(Buffer.alloc(0)) }
const end = offset + length
return pull( traverse.depthFirst({ node, start: 0, end: fileSize }, getChildren(dag, offset, end)), map(extractData(offset, end)), filter(Boolean) )
-
生成并返回 pull-stream 类库的
values
流。返回的流在依次被用在resolver.js
的createResolver
函数返回的流中,后者又被 ipfs-unixfs-exporter 类库中的pull
函数中的map
流所使用;ipfs-unixfs-exporter 类库中的pull
函数中的map
流又被get-pull-stream.js
文件中的pull.map
所使用,并且最终被get.js
文件中的pull.asyncMap
流的处理函数转换为 Buffer 对象,从而我们的程序从 Buffer 对象中读取出文件内容。
-
设置要获取的第一个对象
CARV Boosts Growth in Decentralized Security by Partnering with ARPA Network
CARV and ARPA Network collaborating to boost decentralized security growth with verifiable randomnes...
Fuse and QuickNode Partner to Elevate Web3 Payments
Fuse Network and QuickNode launch Fuse Ember L2, a zkEVM-powered solution optimizing Web3 payments w...
XRP Price Breakdown below $2: Analyst Reveals Next Major Support
Crypto analyst MadWhale has raised the possibility of the XRP price experiencing a breakdown below t...